Celery基础

五十岚2022年10月12日大约 3 分钟

Celery Task

Celery 是一个简单、灵活且可靠的,处理大量消息的分布式系统,

  • 专注于实时处理的异步任务队列
  • 支持任务调度等定时任务

一、celery 概述:

1.celery 组成:

Celery 的架构由三部分组成,消息中间件(message broker),任务执行单元(worker)和任务执行结果存储(task result store)组成。

  • 消息中间件
    • Celery 本身不提供消息服务,但是可以方便的和第三方提供的消息中间件集成。包括,RabbitMQ, Redis 等等
  • 任务执行单元
    • Worker 是 Celery 提供的任务执行的单元,worker 并发的运行在分布式的系统节点中。
  • 任务结果存储
    • Task result store 用来存储 Worker 执行的任务的结果,Celery 支持以不同方式存储任务的结果,包括 AMQP、redis、rbmq 等

【另外】: Celery 还支持不同的并发和序列化的手段

并发:Prefork, Eventlet, gevent, threads/single threaded 序列化:pickle, json, yaml, msgpack. zlib, bzip2 compression, Cryptographic message signing 等等

2.使用场景:

celery 是一个强大的 分布式任务队列的异步处理框架,它可以让任务的执行完全脱离主程序,甚至可以被分配到其他主机上运行。我们通常使用它来实现异步任务(async task)和定时任务(crontab)。

  • 异步任务:将耗时操作任务提交给 Celery 去异步执行,比如发送短信/邮件、消息推送、音视频处理等等

  • 定时任务:定时执行某件事情,比如每天数据统计

3. 安装:

pip install -U Celery   or  sudo easy_install Celery

4.执行流程:

user -> 初始化celery 定好borker(消息中间件,队列/redis)、定好 backend(存储位置) -> 执行任务(celery执行并将结果写入中间件)

二、celery 创建任务:

1.创建简单任务

import celery
import time

backend = 'redis://172.16.128.29:6379/0'
broker = 'redis://172.16.128.29:6379/1'
cel = celery.Celery('test', broker=broker, backend=backend)

@cel.task
def send_email(name):
    print("向%s发送邮件..." % name)
    time.sleep(1)
    print("向%s发送邮件完成" % name)
    return "ok"

2. 通过引入单独配置文件,创建任务

将上文简单任务部分改为如下:

cel = celery.Celery('test')
cel.config_from_object('celery_test.celery_config')

将配置文件 celery_config.py 引入,路径应该为项目根目录后的绝对路径

配置项:

celery 4.0 以上支持小写的简化配置项:(通常如下)

broker_url = "redis://172.16.128.29:6379/0"
result_backend = "redis://172.16.128.29:6379/1"

timezone = 'Asia/Shanghai'  # 指定时区,默认是 UTC

task_serializer = 'json'  # 任务序列化和反序列化使用pickle方案
result_serializer = 'json'  # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON
result_expires = 60 * 60 * 24  # 任务过期时间,不建议直接写86400,应该让这样的magic数字表述更明显
accept_content = ['json']  # 指定接受的内容类型

imoprt = (
    # 指定导入的任务模块
    'celery_test.celery_task'
)

三、celery 启动任务

1. 直接终端执行命令:

celery -A tasks worker --loglevel=info
or
celery worker -A tasks -l info

执行以上命令,将任务启动

【注意】:

windows 下 celery 若命令行执行报错如下:

ERROR/MainProcess] Task handler raised error: ValueError('not enough values to unpack (expected 3, got 0)')

需要安装 eventlet 才行(原因不明)

pip3 install eventlet

然后运行命令需携带参数 -P eventlet:

celery worker -A celery_task -l info -P eventlet

2.写脚本执行:

建立一个 task_worker.py 文件来启动任务

import sys
import os
from celery_test.celery_task import cel

root_path = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
sys.path.insert(0, os.path.dirname(__file__))

sys.argv.extend(["-l", "INFO", "-P", "solo"])
cel.worker_main()
  • 需要配置对应的正确路径,来引入相关的 celery_app

四、创建 celery 生产者

from celery_task import send_email
result = send_email.delay("yuan")
print(result.id)
result2 = send_email.delay("alex")
print(result2.id) 

五、获取任务执行结果

1.通过 AsyncResult 进行简单判断:

from celery.result import AsyncResult
from celery_task import cel

async_result=AsyncResult(id="c6ddd5b7-a662-4f0e-93d4-ab69ec2aea5d", app=cel)

if async_result.successful():
    result = async_result.get()
    print(result)
    # result.forget() # 将结果删除
elif async_result.failed():
    print('执行失败')
elif async_result.status == 'PENDING':
    print('任务等待中被执行')
elif async_result.status == 'RETRY':
    print('任务异常后正在重试')
elif async_result.status == 'STARTED':
    print('任务已经开始被执行')
上次编辑于: 2022/10/12 14:58:07