Flask-Celery-Helper
说明
- 专门在Flask中使用celery中的扩展库,Flask文档中说不需要使用,但是大型项目中使用工工厂函数时需要init_app函数,所以推荐使用该扩展,作者也是这么说的。
安装
- Redis:- pip install redis==3.4.1
- Celery:- pip install celery==4.4.1
- pip install flask-celery-helper
文档
使用
project/
├── manage.py
└── app/
    ├── __init__.py
    ├── config.py
    ├── tasks.py
    ├── extensions.py
    ├── templates/
    │       └── activate.html
    └── views
            ├── __init__.py
            ├── main.py
            └── user.py
from flask_script import Manager
from app import create_app
from app.extensions import celery
app = create_app()
manager = Manager(app)
if __name__ == '__main__':
    manager.run()
from flask import Flask
from app.extensions import config_extensions
from app.views import register_blueprint
from app.config import Config
def create_app():
    
    app = Flask(__name__)
    
    app.config.from_object(Config)
    
    config_extensions(app)
    
    register_blueprint(app)
    
    return app
import os
class Config:
    MAIL_SERVER = 'smtp.qq.com'
    MAIL_PORT = 465
    MAIL_USE_SSL = True
    MAIL_USERNAME = '1234567890@qq.com'
    MAIL_PASSWORD = os.getenv('MAIL_PASSWORD', '123456')
    CELERY_BROKER_URL = 'redis://localhost:6379/2'
    CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
    SERVER_NAME = '127.0.0.1:5000'
from flask_mail import Mail
from flask_celery import Celery
mail = Mail()
celery = Celery()
def config_extensions(app):
    mail.init_app(app)
    celery.init_app(app)
from app.extensions import mail
from flask_mail import Message
from flask import current_app, render_template
from app.extensions import celery
@celery.task
def add(a, b):
    from time import sleep
    sleep(5)
    return a + b
@celery.task
def async_send_mail(subject, to, template, **kwargs):
    if isinstance(to, list):
        recipients = to
    elif isinstance(to, str):
        recipients = to.split(',')
    else:
        raise Exception('邮件接收者格式有误')
    
    html = render_template(template, **kwargs)
    
    msg = Message(
        subject=subject,
        recipients=recipients,
        html=html,
        sender=current_app.config['MAIL_USERNAME']
    )
    with current_app.app_context():
        mail.send(msg)
- app/templates/activate.html
<h1>Hello {{username}}!</h1>
<p>账户激活请点击右边连接<a href="{{ url_for('user.activate', token=token, _external=True) }}">激活</a></p>
from .main import main
from .user import user
DEFAULT_BLUEPRINT = (
    (main, ''),
    (user, '/user'),
)
def register_blueprint(app):
    for blueprint, url_prefix in DEFAULT_BLUEPRINT:
        app.register_blueprint(blueprint, url_prefix=url_prefix)
from flask import Blueprint
from app.tasks import add
main = Blueprint('main', __name__)
@main.route('/')
def index():
    return 'Celery测试'
@main.route('/add/')
def ad():
    s = add.delay(2, 3)
    print(s)
    return 'add complete'
from flask import Blueprint
from app.tasks import async_send_mail
user = Blueprint('user', __name__)
@user.route('/register/')
def send():
    async_send_mail.delay(
        subject='celery test',
        to='jerry625731716@163.com',
        template='activate.html',
        username='xxx', token=10
    )
    return '注册成功,请点击邮件链接完成激活'
@user.route('/activate/<token>/')
def activate(token):
    return '用户{}激活成功'.format(token)
测试
- 启动程序:python manage.py runserver -d -r --threaded
- 启动Celery:celery -A manage.celery worker --loglevel=info
- 启动Redis:因为配置中Celery依赖Redis
- 异步加法:http://127.0.0.1:5000/add/
- 邮件发送:http://127.0.0.1:5000/user/register/