Flask-SQLAlchemy

说明

  • 该扩展提供了绝大多数关系型数据库的支持,而且提供了ORM。

安装

  • pip install flask-sqlalchemy

文档

配置:

  • 名称:SQLALCHEMY_DATABASE_URI
  • 作用:连接数据库的地址
  • 取值:
数据库 地址
sqlite sqlite:/// + 数据库文件
MySQL mysql+pymysql://username:password@host:port/database
  • 使用:
from flask_sqlalchemy import SQLAlchemy

# 配置数据库连接地址
app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@127.0.0.1:3306/model'

# 数据表的更改追踪,需要消耗额外的资源,不需要可以关闭
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# 创建数据库操作对象
db = SQLAlchemy(app)

# 设计模型类
class User(db.Model):
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    name = db.Column(db.String(20), unique=True, nullable=False)
    email = db.Column(db.String(32), unique=True, nullable=False)

CURD操作

  • 增加数据
# 配置自动提交:在请求结束时会自动执行SQL语句
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True

@app.route('/insert/')
def insert():
    # 创建模型对象
    # longwei = User(name='longwei', email='longwei@163.com')
    # 保存对象:插入单条数据
    # db.session.add(longwei)

    wenming = User(name='wenming', email='wenming@163.com')
    kunyu = User(name='kunyu', email='kunyu@163.com')
    linfei = User(name='linfei', email='linfei@163.com')
    luoyu = User(name='luoyu', email='luoyu@163.com')
    yuhang = User(name='yuhang', email='yuhang@163.com')
    peilong = User(name='peilong', email='peilong@163.com')
    # 保存对象:插入多条数据
    db.session.add_all([wenming, kunyu, linfei, luoyu, yuhang, peilong])

    # 提交操作
    # db.session.commit()
    # 操作回滚
    # db.session.rollback()
    return '数据已添加'
  • 查寻数据
@app.route('/select/<int:uid>/')
def select(uid):
    # 根据主键进行查询:查到返回一个对象,没有查到返回None
    user = User.query.get(uid)
    if user:
        return user.name
    return '查无此人'
  • 修改数据
@app.route('/update/<int:uid>/')
def update(uid):
    user = User.query.get(uid)
    if user:
        # 修改属性
        user.email = 'xxx@163.com'
        # 没有单独的更新操作,只要再次保存,ORM会自动识别为更新操作
        db.session.add(user)
        return '数据已更改'
    return '查无此人'
  • 删除数据
@app.route('/delete/<int:uid>/')
def delete(uid):
    user = User.query.get(uid)
    if user:
        db.session.delete(user)
        return '数据已删除'
    return '查无此人'

模型设计参考

  • 常见字段类型
数据类型 python类型
Integer int
Float float
String str,变成字符串
Text str,长度不受限制的字符串
Boolean bool,布尔值,只有True/False
Date datetime.date,日期
Time datetime.time,时间
DateTime datetime.datetime,日期时间
Interval datetime.timedelta,时间间隔
PickleType pickle.dumps(),使用该方法处理后的对象
LargeBinary bytes,任意大的二进制数据
  • 常见字段选项
字段选项 说明
primary_key 是否作为主键索引,默认为False
autoincrement 是否设置自增字段,默认为False
unique 是否作为唯一索引,默认为False
index 是否作为普通索引,默认为False
nullable 字段是否可以为空,默认为True
default 设置默认值,插入数据时自动生效(表结构看不到)
  • 使用总结
    • 插入数据时可以不传值的情况:自增的字段、有默认值的、可以为空的
    • 使用flask-sqlalchemy时需要每个模型都有一个主键,通常为id
    • 模型名与数据表的名字
      • 默认将模型类名的大驼峰命名风格,转换为蛇形命名作为表名,如:
        • UserModel => user_model
      • 指定表名:__tablename__ = 'xxx'

查询操作

  • 说明:在ORM中,各种查询操作都是通过方法实现的

  • 常见操作

方法 说明
get 根据主键进行查询,找到返回一个对象,找不到返回None
get_or_404 功能同上,找不到时报404错
first 查询符合条件的第一条数据
firs_or_404 功能同上,找不到时报404错
all 查询所有数据
order_by 排序,可以指定多个字段,asc(升序),desc(降序)
offset 跳过指定数量的数据
limit 限制提取的结果集数量
count 统计结果集数量
  • 条件查询
方法 说明
filter_by 参数是关键字表达式,只能是等值条件,可以指定多个也可以调用多次
filter 参数时SQL表达式,用法同上
  • filter查询条件

    • 关系
    >, __gt__
        users = User.query.filter(User.id > 5).all()
        # 与上式等价
        users = User.query.filter(User.id.__gt__(5)).all()
    >=, __ge__
    <, __lt__
    <=, __le__
    ==, __eq__
    !=, __ne__
    
    • 范围
    users = User.query.filter(User.id.between(5, 8)).all()
    users = User.query.filter(User.id.in_((1, 3, 5, 7))).all()
    users = User.query.filter(User.id.notin_((1, 3, 5, 7))).all()
    
    • 内容
    users = User.query.filter(User.name.startswith('l')).all()
    users = User.query.filter(User.name.endswith('ng')).all()
    users = User.query.filter(User.name.contains('n')).all()
    users = User.query.filter(User.name.like('%an%')).all()
    users = User.query.filter(User.name.notlike('%an%')).all()
    # 忽略大小写的模糊匹配
    users = User.query.filter(User.name.ilike('%an%')).all()
    users = User.query.filter(User.name.notilike('%an%')).all()
    
    • 逻辑
    # 默认多个条件就是and关系,通常省略
    users = User.query.filter(User.id > 3, User.id < 8).all()
    # 指定为and关系,功能同上
    users = User.query.filter(and_(User.id > 3, User.id < 8)).all()
    # 指定为or关系
    users = User.query.filter(or_(User.id < 3, User.id > 8)).all()
    
  • 其他查询

    • 等价查询
    @app.route('/dengjia/')
    def dengjia():
        user = User.query.get(1)
        # 上下等价
        user = db.session.query(User).get(1)
        if user:
            return user.name
        return '查无此人'
    
    • 聚合函数
    from sqlalchemy import func
    
    # 聚合函数:max、min、avg、sum、count
    @app.route('/juhe/')
    def juhe():
        max_id = db.session.query(func.max(User.id)).scalar()
        return str(max_id)
    
    • 分组统计
    @app.route('/group/')
    def group():
        ret = db.session.query(User.sex, func.count(User.sex)).group_by(User.sex).all()
        print(ret)
        return '分组统计'
    
    • 指定字段
    @app.route('/fields/')
    def fields():
        ret = User.query.with_entities(User.name, User.sex).all()
        print(ret)
        return '指定字段查询'
    
    • 执行SQL语句
    @app.route('/rawsql/')
    def rawsql():
        # users = db.session.execute('select * from user')
        # 可以动态拼接SQL语句
        users = db.session.execute('select * from user where id > :id', params={'id': 5})
        # 获取一条
        # user = users.fetchone()
        # return user.name
        # 获取多条
        # user_many = users.fetchmany(3)
        # return ','.join(u.name for u in user_many)
        # 获取所有
        user_all = users.fetchall()
        return ','.join(u.name for u in user_all)
    
    • 分页查询
    @app.route('/paginate/')
    def paginate():
        # 返回一个Pagination对象
        pagination = User.query.paginate(page=1, per_page=3, error_out=False)
        # 当前页的数据
        users = pagination.items
        return ', '.join(u.name for u in users)
    
    • 分页查询说明
    方法:paginate
        参数:
            page:当前页码,默认为1
            per_page:每页大小,默认为20
            error_out:有错时是否报错
        返回值:
            Pagination对象,其中包含了所有的分页信息
    
    Pagination:
        属性:
            page:当前页码
            per_page:页大小
            pages:总页数
            total:数据总量
            prev_num:上一页页码
            next_num:下一页页码
            has_prev:是否有上一页
            has_next:是否有下一页
            items:当前页的数据
        方法:
            iter_pages:返回一个迭代器,分页导航条上页码,显示不完的返回None
            prev:上一页的分页对象
            next:下一页的分页对象
    
    • SQL日志:查看执行的SQL语句
    # 以下三种任意一种方式都可以
    
    # 是否开启调试模式
    app.config['DEBUG'] = True
    # 是否开始测试模式
    app.config['TESTING'] = True
    # 专门用来开启SQL日志记录的
    app.config['SQLALCHEMY_RECORD_QUERIES'] = True
    
    from flask_sqlalchemy import get_debug_queries
    
    for query in get_debug_queries():
        print(query)
    

模型关系

  • 一对多(使用最多)

    • 一:学生(Student)
      • 添加反向引用:articles=db.relationship('Article',backref='stu',lazy='dynamic')
    • 多:文章(Article)
      • 添加外键关联:sid = db.Column(db.Integer, db.ForeignKey('student.id'))
  • 一对一:其实是一对多的一种特殊情况

    • 一:学生(Student)
      • 添加反向引用:profile=db.relationship('Profile',backref='stu',uselist=False)
    • 一:详情(Profile)
      • 添加外键关联:sid = db.Column(db.Integer, db.ForeignKey('student.id'))
  • 多对多

    • 多:学生(Student)

      • 添加反向引用:必须使用secondary指定中间表
      courses = db.relationship(
          'Course', 
          lazy='dynamic', 
          secondary='selection', 
          backref=db.backref('students', lazy='dynamic')
      )
      
    • 多:课程(Course)

    • 中间关联表(选课表),不需要手动维护

      selection = db.Table('selection',
          db.Column('student_id', db.Integer, db.ForeignKey('student.id')),
          db.Column('course_id', db.Integer, db.ForeignKey('course.id'))
      )
      
  • 参考代码

    from flask import Flask
    from flask_script import Manager
    from flask_sqlalchemy import SQLAlchemy
    from flask_migrate import MigrateCommand, Migrate
    
    app = Flask(__name__)
    
    app.config['SQLALCHEMY_DATABASE_URI'] = 'mysql+pymysql://root:123456@127.0.0.1:3306/model3'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True
    
    db = SQLAlchemy(app)
    migrate = Migrate(app, db)
    manager = Manager(app)
    manager.add_command('db', MigrateCommand)
    
    # 学生模型
    class Student(db.Model):
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        name = db.Column(db.String(20), unique=True, nullable=False)
        """
        添加反向引用:不会影响表结构,无需迁移
        backref:反向引用的字段名
        lazy:关联数据的加载时机
            'select' / True:首次使用时自动查询,默认选项
            'joined' / False:关联查询时
            'subquery':子查询时
            'dynamic':不加载数据,提供了关联数据的查询(不能在一的一侧使用)
        """
        articles = db.relationship('Article', backref='stu', lazy='dynamic')
        # 添加一对一反向引用(uselist=False)
        profile = db.relationship('Profile', backref='stu', uselist=False)
        # 添加多对多反向引用,必须使用secondary指定中间关联表
        courses = db.relationship('Course', lazy='dynamic', secondary='selection', 
                                  backref=db.backref('students', lazy='dynamic'))
    
    # 文章模型
    class Article(db.Model):
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        title = db.Column(db.String(20), unique=True, nullable=False)
        content = db.Column(db.Text)
        # 添加外键关联
        sid = db.Column(db.Integer, db.ForeignKey('student.id'))
    
    # 详情模型
    class Profile(db.Model):
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        realname = db.Column(db.String(20), unique=True, nullable=False)
        # 添加外键关联
        sid = db.Column(db.Integer, db.ForeignKey('student.id'))
    
    # 课程模型
    class Course(db.Model):
        id = db.Column(db.Integer, primary_key=True, autoincrement=True)
        name = db.Column(db.String(20), unique=True, nullable=False)
    
    # 学生选课关联表(中间表),ORM会自动维护
    selection = db.Table('selection',
        db.Column('student_id', db.Integer, db.ForeignKey('student.id')),
        db.Column('course_id', db.Integer, db.ForeignKey('course.id'))
    )
    
    # 多对多
    @app.route('/many_many/')
    def many_many():
        ''''''
        student = Student.query.get(1)
        course = Course.query.get(3)
        # 选课
        # student.courses.append(course)
        # return student.name + '选择了' + course.name
        # 取消选课
        student.courses.remove(course)
        return student.name + '取消选择了' + course.name
    
        '''
        # 根据学生找课程
        student = Student.query.get(1)
        courses = student.courses.all()
        return ', '.join(c.name for c in courses)
        '''
    
        '''
        # 根据课程找学生
        course = Course.query.get(3)
        students = course.students.all()
        return ', '.join(s.name for s in students)
        '''
    
    # 一对一
    @app.route('/one_one/<int:xid>/')
    def one_one(xid):
        # 根据学生找详情
        # student = Student.query.get(xid)
        # return student.profile.realname
        # 根据详情找学生
        profile = Profile.query.get(xid)
        return profile.stu.name
    
    # 一对多
    @app.route('/one_many/<int:xid>/')
    def one_many(xid):
        '''
        # 原生查询
        student = Student.query.get(xid)
        articles = Article.query.filter(Article.sid == xid).all()
        return ', '.join(a.title for a in articles)
        '''
    
        '''
        # 使用join
        students = Student.query.join(Article, Student.id == Article.sid).all()
        return ', '.join(s.name for s in students)
        '''
        # 关联查询
        # 根据学生找文章
        # student = Student.query.get(xid)
        # articles = student.articles.all()
        # return ', '.join(a.title for a in articles)
        # 根据文章找学生
        article = Article.query.get(xid)
        return article.stu.name
    
    @app.route('/')
    def index():
        return '模型使用'
    
    if __name__ == '__main__':
        manager.run()
    

results matching ""

    No results matching ""