博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Python之celery
阅读量:5130 次
发布时间:2019-06-13

本文共 4148 字,大约阅读时间需要 13 分钟。

一、celery简介

  Celery是一个Python开发的异步分布式任务调度模块。celery本身不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbing, redis, 数据库等。

  broker是一个消息传输的中间件,可以理解为一个邮箱。每当应用程序调用celery的异步任务的时候,会向broker传递消息,而后celery的worker将会取到消息,进行对于的程序执行。好吧,这个邮箱可以看成是一个消息队列。那么什么又是backend,通常程序发送的消息,发完就完了,可能都不知道对方时候接受了。为此,celery实现了一个backend,用于存储这些消息以及celery执行的一些消息和结果。对于 brokers,官方推荐是rabbitmq和redis,至于backend,就是数据库啦。为了简单起见,我们都用redis。

  redis连接URL的格式为:

  redis://password@hostname:port/db_number

二、celery小例子

  1.在Linux上要先启动redis(./redis-server &)。

  2.程序代码如下:

#!/usr/bin/env python# -*- coding:utf-8 -*-# @Time    :2017/12/19 15:12# @Author  :huangdongju# @File    :17-2.pyfrom celery import Celerybroker = "redis://192.168.203.12:6379/5"  #5代表第五个数据库backend = "redis://192.168.203.12:6379/6"app = Celery("17-2",broker=broker,backend=backend)@app.task   #注册到任务中去def add(x,y):    return x + yre = add.delay(10,20)print (re.result)print (re.ready)print (re.get(timeout = 1))print (re.status)

  3.将worker放置在Linux上,在Linux上进行如下配置(Python的版本至少为2.7.0以上)。

  # pip install redis

  # pip install celery

  # cd /usr/local/src

  # mkdir celery

  # cd celery

  # vim 17-2.py  (要与之前的文件名相同)

  # celery -A 17-2 worker - l info

     4.结果

 在Linux上输出的日志为:

 三、celery多实例

  celery可以支持多台不通的计算机执行不同的任务或相同的任务。如果要说celery的分布式应用的的话,那就是celery的消息路由机制,就要提一下AMQP协议。具体的可以查看AMQP文档。简单来说就是可以有多个消息队列(Message Queue),不同的消息可以指定发给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routing key ,Exchange通过routing key来把消息路由(routes)到不同的Message Queue中去。实例代码如下:

#!/usr/bin/env python# -*- coding:utf-8 -*-# @Time    :2017/12/20 16:19# @Author  :huangdongju# @File    :demon3.pyimport timefrom celery import Celeryapp = Celery()app.config_from_object("celeryconfig")@app.taskdef taskA(x,y):    return x*y@app.taskdef taskB(x,y,z):    return x+y+z@app.taskdef add(x,y):    return x+y #!/usr/bin/env python # -*- coding:utf-8 -*- # @Time    :2017/12/20 9:44 # @Author  :huangdongju # @File    :celeryconfig.py from kombu import Queue, Exchange BROKER_URL = "redis://192.168.203.12:6379/1" CELERY_RESULT_BACKEND = "redis://192.168.203.12:6379/2" #要大写 CELERY_QUEUES = { #配置消息队列 Queue("default",Exchange("default"),routing_key = "default"), Queue("for_task_A",Exchange("for_task_A"),routing_key = "for_task_A"), Queue("for_task_B",Exchange("for_task_B"),routing_key = "for_task_B") } CELERY_ROUTES ={ "demon3.taskA":{ "queue":"for_task_A","routing_key":"for_task_A"}, "demon3.taskB":{ "queue":"for_task_B","routing_key":"for_task_B"} }

  在服务器上要同步demon3.py与celeryconfig.py这两个文件。然后打开两个进程:

celery -A demon3 worker -l info -n workerA.%h -Q for_task_Acelery -A demon3 worker -l info -n workerB.%h -Q for_task_B

结果如下:

200

SUCCESS
6
SUCCESS
None
PENDING

  在上述结果中,看到状态有为PENDING,表示没有执行,这是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。

 

四、celery与定时任务

  在celery中执行定时任务非常简单,只需要设置celery对象中的CELERY_SCHEDULE变量。实例代码为:

#!/usr/bin/env python# -*- coding:utf-8 -*-# @Time    :2017/12/20 16:19# @Author  :huangdongju# @File    :demon3.pyimport timefrom celery import Celeryapp = Celery()app.config_from_object("celeryconfig")@app.taskdef taskA(x,y):    return x*y@app.taskdef taskB(x,y,z):    return x+y+z@app.taskdef add(x,y):    return x+y
#!/usr/bin/env python # -*- coding:utf-8 -*- # @Time    :2017/12/20 9:44 # @Author  :huangdongju # @File    :celeryconfig.py from kombu import Queue, Exchange BROKER_URL = "redis://192.168.203.12:6379/1" CELERY_RESULT_BACKEND = "redis://192.168.203.12:6379/2" #要大写 CELERY_QUEUES = { #配置消息队列 Queue("default",Exchange("default"),routing_key = "default"), Queue("for_task_A",Exchange("for_task_A"),routing_key = "for_task_A"), Queue("for_task_B",Exchange("for_task_B"),routing_key = "for_task_B") } CELERY_ROUTES ={ "demon3.taskA":{ "queue":"for_task_A","routing_key":"for_task_A"}, "demon3.taskB":{ "queue":"for_task_B","routing_key":"for_task_B"} } #定时任务 CELERY_TIMEZONE = 'UTC' CELERY_SCHEDULE = { 'taskA_schedule':{ 'task':'demon3.taskA', 'schedule':20, 'args':(5,6) }, 'taskB_schedule':{ 'task':'demon3.taskB', 'schedule':50, 'args':(100,200,300) }, 'add_schedule':{ 'task':'demon3.add', 'schedule':10, 'args':(110,120) }, }

 

转载于:https://www.cnblogs.com/huangdongju/p/8066796.html

你可能感兴趣的文章
hdu4180 数论
查看>>
Vue(一)创建第一个Vue程序
查看>>
P3369 【模板】普通平衡树
查看>>
PHP100-第四讲 PHP5.4 运算符、流程控制
查看>>
build/core/config.mk
查看>>
Python--学习笔记5 numpy
查看>>
软工网络15个人作业1
查看>>
关于CMD/AMD和Common.js/Sea.js/Require.js
查看>>
Flask RESTful API搭建笔记
查看>>
【啊哈!算法】之四、选择排序
查看>>
棋牌游戏服务器架构: 总体设计
查看>>
Day-2:检索数据
查看>>
HDU - 5828 Rikka with Sequence (线段树)
查看>>
利用vcard和qrcode.js生成二维码导入联系人
查看>>
ASP.NET SessionState 解惑
查看>>
find 命令
查看>>
diy操作系统 0:万事开头难
查看>>
正则表达式------捕获性分组,非捕获性分组,前瞻,后瞻
查看>>
HTML颜色名
查看>>
Objective中的协议(Protocol)
查看>>