Flask-JWT-Extended

说明

  • 是一个专门用来使用JWT的解决方案,比Flask-JWT更加强大

安装

  • pip install flask-jwt-extended

文档

使用

from flask_jwt_extended import JWTManager

jwt = JWTManager()
jwt.init_app(app)

# 获取token,用于登录
@auth.route('/', methods=['POST'])
def login():
    username = request.json.get('username')
    password = request.json.get('password')
    if not username or not password:
        return jsonify(code='40101', msg='缺少用户名或密码'), 401

    user = User.query.filter_by(username=username, status=UserStatus.NORMAL).first()
    if not user:
        return jsonify(code='40102', msg='无效的用户名'), 401
    elif not user.verify_password(password):
        return jsonify(code='40103', msg='无效的密码'), 401

    # 生成Token
    access_token = create_access_token(identity=user.id)
    refresh_token = create_refresh_token(identity=user.id)

    # 缓存Token
    cache.set(get_jti(access_token), 'false',
              current_app.config['JWT_ACCESS_TOKEN_EXPIRES'])
    cache.set(get_jti(refresh_token), 'false',
              current_app.config['JWT_REFRESH_TOKEN_EXPIRES'])
    # 返回Token
    return jsonify(
        code='20100',
        msg='登录成功',
        access_token=access_token,
        refresh_token=refresh_token
    ), 201

# 退出登录
@auth.route('/', methods=['DELETE'])
# 保护指定路由
@jwt_required
def logout():
    # 设置过期Token黑名单
    access_jti = get_raw_jwt()['jti']
    cache.set(access_jti, 'true',
              current_app.config['JWT_ACCESS_TOKEN_EXPIRES'])
    # 删除用户缓存数据
    cache.delete(str(get_jwt_identity()))
    return jsonify(msg='退出登录成功')

# 通过钩子装饰器,拦截所有请求
@auth.before_app_request
def authentication():
      # 所有的OPTIONS请求
    if request.method == 'OPTIONS':
        return
    # 路由白名单,跳过认证
    elif request.path in app_config['URL_WHITE_LIST']:
        if request.method in app_config['URL_WHITE_LIST'][request.path]:
            return
    # Swagger接口文档请求
    elif request.path.startswith('/swagger') or request.path == '/':
        return

    # 拦截请求
      @jwt_required
    def verify_token():
        pass

    verify_token()

# 刷新Token
@auth.route('/', methods=['PUT'])
@jwt_refresh_token_required
def refresh():
    access_token = create_access_token(identity=get_jwt_identity())
    access_jti = get_jti(access_token)
    cache.set(access_jti, 'false',
              current_app.config['JWT_ACCESS_TOKEN_EXPIRES'])
    return jsonify(code='20100', access_token=access_token), 201

# 没有携带Token
@jwt.unauthorized_loader
def unauthorized_callback(e):
    return jsonify(code='40104', msg=e), 401

# Token过期
@jwt.expired_token_loader
def expired_token_callback(expired_token):
    token_type = expired_token['type']
    msg = 'The {} token has expired'.format(token_type)
    if token_type == 'access':
        return jsonify(code='40105', msg=msg)
    else:
        return jsonify(code='40106', msg=msg), 401

# Token错误
@jwt.invalid_token_loader
def invalid_token_callback(e):
    return jsonify(code='40107', msg=e)

# Token吊销
@jwt.revoked_token_loader
def revoked_token_callback():
    return jsonify(code='40108', msg='The token has been revoked'), 401

# Token是否吊销
@jwt.token_in_blacklist_loader
def token_in_blacklist_callback(decrypted_token):
    jti = decrypted_token['jti']
    # 从缓存系统(Redis)中获取
    entry = cache.get(jti)
    if entry is None:
        return True
    return entry == 'true'

results matching ""

    No results matching ""