Flask-Celery-Helper

说明

  • 专门在Flask中使用celery中的扩展库,Flask文档中说不需要使用,但是大型项目中使用工工厂函数时需要init_app函数,所以推荐使用该扩展,作者也是这么说的。

安装

  • Redispip install redis==3.4.1
  • Celerypip install celery==4.4.1
  • pip install flask-celery-helper

文档

使用

  • 目录结构
project/
├── manage.py
└── app/
    ├── __init__.py
    ├── config.py
    ├── tasks.py
    ├── extensions.py
    ├── templates/
    │       └── activate.html
    └── views
            ├── __init__.py
            ├── main.py
            └── user.py
  • manage.py
from flask_script import Manager
from app import create_app
from app.extensions import celery

app = create_app()
manager = Manager(app)

if __name__ == '__main__':
    manager.run()
  • app.__init__.py
from flask import Flask
from app.extensions import config_extensions
from app.views import register_blueprint
from app.config import Config

def create_app():
    # 创建应用
    app = Flask(__name__)
    # 配置应用
    app.config.from_object(Config)
    # 配置扩展
    config_extensions(app)
    # 注册蓝本
    register_blueprint(app)
    # 返回应用
    return app
  • app.config.py
import os

class Config:
    MAIL_SERVER = 'smtp.qq.com'
    MAIL_PORT = 465
    MAIL_USE_SSL = True
    MAIL_USERNAME = '1234567890@qq.com'
    MAIL_PASSWORD = os.getenv('MAIL_PASSWORD', '123456')
    CELERY_BROKER_URL = 'redis://localhost:6379/2'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
    SERVER_NAME = '127.0.0.1:5000'
  • app.extensions.py
from flask_mail import Mail
from flask_celery import Celery

mail = Mail()
celery = Celery()

def config_extensions(app):
    mail.init_app(app)
    celery.init_app(app)
  • app.tasks.py
from app.extensions import mail
from flask_mail import Message
from flask import current_app, render_template
from app.extensions import celery

@celery.task
def add(a, b):
    from time import sleep
    sleep(5)
    return a + b

@celery.task
def async_send_mail(subject, to, template, **kwargs):
    if isinstance(to, list):
        recipients = to
    elif isinstance(to, str):
        recipients = to.split(',')
    else:
        raise Exception('邮件接收者格式有误')
    # 渲染邮件模板得到邮件消息体
    html = render_template(template, **kwargs)
    # 创建邮件对象
    msg = Message(
        subject=subject,
        recipients=recipients,
        html=html,
        sender=current_app.config['MAIL_USERNAME']
    )
    with current_app.app_context():
        mail.send(msg)
  • app/templates/activate.html
<h1>Hello {{username}}!</h1>
<p>账户激活请点击右边连接<a href="{{ url_for('user.activate', token=token, _external=True) }}">激活</a></p>
  • app.views.__init__.py
from .main import main
from .user import user

# 蓝本配置
DEFAULT_BLUEPRINT = (
    (main, ''),
    (user, '/user'),
)

# 封装函数,注册蓝本
def register_blueprint(app):
    for blueprint, url_prefix in DEFAULT_BLUEPRINT:
        app.register_blueprint(blueprint, url_prefix=url_prefix)
  • app.views.main.py
from flask import Blueprint
from app.tasks import add

main = Blueprint('main', __name__)

@main.route('/')
def index():
    return 'Celery测试'

@main.route('/add/')
def ad():
    s = add.delay(2, 3)
    print(s)
    return 'add complete'
  • app.views.user.py
from flask import Blueprint
from app.tasks import async_send_mail

user = Blueprint('user', __name__)

@user.route('/register/')
def send():
    async_send_mail.delay(
        subject='celery test',
        to='jerry625731716@163.com',
        template='activate.html',
        username='xxx', token=10
    )
    return '注册成功,请点击邮件链接完成激活'

# 激活的路由:token称为令牌,不要使用明文信息
@user.route('/activate/<token>/')
def activate(token):
    return '用户{}激活成功'.format(token)

测试

  • 启动程序:python manage.py runserver -d -r --threaded
  • 启动Celerycelery -A manage.celery worker --loglevel=info
  • 启动Redis:因为配置中Celery依赖Redis
  • 异步加法:http://127.0.0.1:5000/add/
  • 邮件发送:http://127.0.0.1:5000/user/register/

results matching ""

    No results matching ""