Flask-Celery-Helper
说明
- 专门在Flask中使用celery中的扩展库,Flask文档中说不需要使用,但是大型项目中使用工工厂函数时需要
init_app
函数,所以推荐使用该扩展,作者也是这么说的。
安装
Redis
:pip install redis==3.4.1
Celery
:pip install celery==4.4.1
pip install flask-celery-helper
文档
使用
project/
├── manage.py
└── app/
├── __init__.py
├── config.py
├── tasks.py
├── extensions.py
├── templates/
│ └── activate.html
└── views
├── __init__.py
├── main.py
└── user.py
from flask_script import Manager
from app import create_app
from app.extensions import celery
app = create_app()
manager = Manager(app)
if __name__ == '__main__':
manager.run()
from flask import Flask
from app.extensions import config_extensions
from app.views import register_blueprint
from app.config import Config
def create_app():
app = Flask(__name__)
app.config.from_object(Config)
config_extensions(app)
register_blueprint(app)
return app
import os
class Config:
MAIL_SERVER = 'smtp.qq.com'
MAIL_PORT = 465
MAIL_USE_SSL = True
MAIL_USERNAME = '1234567890@qq.com'
MAIL_PASSWORD = os.getenv('MAIL_PASSWORD', '123456')
CELERY_BROKER_URL = 'redis://localhost:6379/2'
CELERY_RESULT_BACKEND = 'redis://localhost:6379/2'
SERVER_NAME = '127.0.0.1:5000'
from flask_mail import Mail
from flask_celery import Celery
mail = Mail()
celery = Celery()
def config_extensions(app):
mail.init_app(app)
celery.init_app(app)
from app.extensions import mail
from flask_mail import Message
from flask import current_app, render_template
from app.extensions import celery
@celery.task
def add(a, b):
from time import sleep
sleep(5)
return a + b
@celery.task
def async_send_mail(subject, to, template, **kwargs):
if isinstance(to, list):
recipients = to
elif isinstance(to, str):
recipients = to.split(',')
else:
raise Exception('邮件接收者格式有误')
html = render_template(template, **kwargs)
msg = Message(
subject=subject,
recipients=recipients,
html=html,
sender=current_app.config['MAIL_USERNAME']
)
with current_app.app_context():
mail.send(msg)
app/templates/activate.html
<h1>Hello {{username}}!</h1>
<p>账户激活请点击右边连接<a href="{{ url_for('user.activate', token=token, _external=True) }}">激活</a></p>
from .main import main
from .user import user
DEFAULT_BLUEPRINT = (
(main, ''),
(user, '/user'),
)
def register_blueprint(app):
for blueprint, url_prefix in DEFAULT_BLUEPRINT:
app.register_blueprint(blueprint, url_prefix=url_prefix)
from flask import Blueprint
from app.tasks import add
main = Blueprint('main', __name__)
@main.route('/')
def index():
return 'Celery测试'
@main.route('/add/')
def ad():
s = add.delay(2, 3)
print(s)
return 'add complete'
from flask import Blueprint
from app.tasks import async_send_mail
user = Blueprint('user', __name__)
@user.route('/register/')
def send():
async_send_mail.delay(
subject='celery test',
to='jerry625731716@163.com',
template='activate.html',
username='xxx', token=10
)
return '注册成功,请点击邮件链接完成激活'
@user.route('/activate/<token>/')
def activate(token):
return '用户{}激活成功'.format(token)
测试
- 启动程序:
python manage.py runserver -d -r --threaded
- 启动
Celery
:celery -A manage.celery worker --loglevel=info
- 启动
Redis
:因为配置中Celery
依赖Redis
- 异步加法:
http://127.0.0.1:5000/add/
- 邮件发送:
http://127.0.0.1:5000/user/register/