DjangoProject │── celery_handle │ ├── config.py │ ├── __init__.py │ ├── manage.py │ └── tasks.py
celery原理图
创建Celery
配置Celery参数
在创建celery实例之前,需要对celery的参数进行一些配置。
在这里列出一些比较常用的Celery配置项:
配置项名称 说明 CELERY_DEFAULT_QUEUE 默认的队列名称,当没有为task特别指定队列时,采用此队列 CELERY_BROKER_URL 消息代理,用于发布者传递消息给消费者,推荐RabbitMQ CELERY_RESULT_BACKEND 后端,用于存储任务执行结果,推荐redis CELERY_TASK_SERIALIZER 任务的序列化方式(加密解密) CELERY_RESULT_SERIALIZER 任务执行结果的序列化方式(加密解密) CELERY_ACCEPT_CONTENT CELERYD_CONCURRENCY 任务消费者的并发数 CELERY_TIMEZONE 时区设置,计划任务需要,推荐 Asia/Shanghai CELERY_QUEUES Celery队列设定,为任务指定不同队列 CELERY_ROUTES Celery路由设定,用于给不同的任务指派不同的队列 CELERYBEAT_SCHEDULE Celery计划任务设定
创建config.py配置文件
from kombu import Queue #CELERY_BROKER_URL = redis://127.0.0.1:6379/8 # 中间件 地址 CELERY_RESULT_BACKEND = redis://127.0.0.1:6379/9 # 结果存放地址 #CELERY_TASK_SERIALIZER = msgpack # 任务序列化和反序列化使用msgpack方案 #CELERY_RESULT_SERIALIZER = json # 读取任务结果一般性能要求不高,所以使用了可读性更好的JSON CELERY_TASK_RESULT_EXPIRES = 1200 # celery任务执行结果的超时时间,我的任务都不需要返回结果,只需要正确执行就行 CELERYD_CONCURRENCY = 50 # celery worker的并发数 也是命令行-c指定的数目 CELERYD_PREFETCH_MULTIPLIER = 4 # celery worker 每次去取任务的数量 CELERYD_MAX_TASKS_PER_CHILD = 40 # 每个worker执行了多少任务就会死掉 # CELERY_DEFAULT_QUEUE = "default_queue" # 默认的队列,如果一个消息不符合其他的队列就会放在默认队列里面 # 任务导入 # 这样Celery在启动时,会自动找到这些模块,并导入模块内的task CELERY_IMPORTS = ( celery_handle.tasks, ) # 为Celery设定多个队列 # 创建Queue的实例时,传入name和routing_key,name即队列名称 CELERY_QUEUES = ( Queue(name=email_queue, routing_key=email_router), ) # 为不同的task指派不同的队列(路由) # 那么task名称就是tasks.send_register_active_email # 每个task的value值也是为dict,设定需要指派的队列name,及对应的routing_key # 这里的name和routing_key需要和CELERY_QUEUES设定的完全一致 CELERY_ROUTES = { # 任务函数: {queue: 队列, routing_key: key}, celery_handle.tasks.send_register_active_email: { queue: email_queue, routing_key: email_router, }, } # 设定 Celery 时区 CELERY_TIMEZONE = Asia/Shanghai # CELERYBEAT_SCHEDULE # Celery计划任务设定 后面有描述
创建celery实例
将之前创建的config传递给app,用于载入celery实例。
manage.py
from celery import Celery from celery_handle import config CELERY_BROKER_URL = redis://127.0.0.1:6379/8 # 中间件 地址 # 创建一个Celery类的实例对象 app = Celery(celery_tasks.tasks, broker=config.CELERY_BROKER_URL) # 载入celery配置 app.config_from_object(config)
创建异步任务
在项目的tasks下,定义一些异步任务函数。
对于需要在celery中异步执行的函数,只需要在函数上增加一个装饰器。
创建tasks.py
from __future__ import absolute_import from celery_handle.manage import app from django_redis import get_redis_connection from django.conf import settings from django.core.mail import send_mail # 在worker端加这几句 import os import django # 为celery设置环境变量, 载入Django项目,才能调用该项目的模块 os.environ.setdefault("DJANGO_SETTINGS_MODULE", "项目名.settings") django.setup() # 下面调用Django项目模块 import utils # 定义任务函数 # name可以在这里指定队列 # @app.task(name="send_email") @app.task() def send_register_active_email(to_email, username, token): 发送激活邮件 subject = message = sender = settings.EMAIL_FROM receiver = [to_email] html_message = send_mail(subject, message, sender, receiver, html_message=html_message)
异步执行任务
对于以app.task()装饰的函数,可以同步运行,也可以异步运行。
接上一个例子的send_register_active_email函数。
同步执行时,直接调用这个函数即可。
本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/293020.html