一、基本使用

1、环境安装

pip install flask-sqlalchemy
pip install pymysql

2、组件初始化

2.1、基本的配置

  1. 首先先安装两个依赖的包。
  2. 配置数据库的连接:app.config[‘SQLALCHEMY_DATABASE_URI’] = “mysql://root:mysql@192.168.44.128:3306/test39”
  3. 关闭数据库的跟踪:app.config[‘SQLALCHEMY_TRACK_MODIFICATIONS’] = False
  4. 开启输出sql语句:app.config[‘SQLALCHEMY_ECHO’] = True
  5. 两种处理python2和python3的名字不一致问题。
from flask import Flask
from flask_restful import Api, Resource
from flask_sqlalchemy import SQLAlchemy

import pymysql
pymysql.install_as_MySQLdb()

"""
python2中数据库客户端: MySqldb
python3中数据库客户端:pymysql
解决方案一:让python2和python3的包进行转换。
import pymysql
pymysql.install_as_MySQLdb()

方案二:表示只使用python3的包,不使用python2的包
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql+pymysql://root:mysql@192.168.44.128:3306/test39"
  1. flask中注册

app = Flask(__name__)
db = SQLAlchemy(app)

# app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://账号:密码@数据库ip地址:端口号/数据库名"
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:mysql@192.168.44.128:3306/test39"
# app.config['SQLALCHEMY_BINDS'] = {}

# 关闭数据库修改跟踪操作[提高性能]:
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

# 开启输出底层执行的sql语句
app.config['SQLALCHEMY_ECHO'] = True

# 开启数据库的自动提交功能[一般不使用]
app.config['SQLALCHEMY_COMMIT_ON_TEARDOWN'] = True

@app.route('/')
def hello_word():
    return "hello, word"

if __name__ == '__main__':
    print(app.url_map)
    app.run(host='0.0.0.0', port= 8000, debug=True)

2.2、结合工厂方法进行配置

  1. 数据库配置信息存放在环境类中加载。
  2. 由于数据库对象和app对象不一定谁先创建,所以可以先创建数据库对象,等app对象创建之后再进行关联。
  3. 进行关联的函数是:数据库对象调用自己的init_app()方法。需要传入app对象。

settings中配置:

# 开发环境
class DevelopmentConfig(BaseConfig):
    """开发环境配置类"""
    DEBUG = True
    # SQL数据库连接信息
    SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:mysql@192.168.243.157:3306/test39"
    # 关闭数据库修改跟踪操作 【提高性能】
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    # 开启输出底层执行sql语句
    SQLALCHEMY_ECHO = True
主模块:

from flask import Flask, make_response, Response, request, current_app
from settings import config_dict
from flask_sqlalchemy import SQLAlchemy

# 延后加载
# 创建了数据库,此时数据库对象还没有跟app关联
db = SQLAlchemy()

# 定义一个工厂方法:
def create_app(config_name):
    app = Flask(__name__)

    config_class = config_dict[config_name]
    app.config.from_object(config_class)

    app.config.from_envvar('CONFIG', silent=True)

    # 懒加载
    db.init_app(app)
    return app

app = create_app("dev")

@app.route('/login')
def login():
    return ""

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=8000, debug=True)

3、构建模型类

3.1、基础模型

在这里插入图片描述

  • 模型类必须继承 db.Model, 其中 db 指对应的组件对象
  • 表名默认为类名小写, 可以通过 __tablename__类属性 进行修改
  • 类属性对应字段, 必须是通过 db.Column() 创建的对象
  • 可以通过 create_all() 和 drop_all()方法 来创建和删除所有模型类对应的表
  • 注意点: 如果没有给对应字段的类属性设置default参数, 且添加数据时也没有给该字段赋值, 则 sqlalchemy会给该字段设置默认值 None

SQLAlchemy常用列类型

类型名 Python类型 说明
Integer int 普通整数,32位
SmallInteger int 16位整数
BigInteger int或long 不限制精度整数
Float float 浮点数
Numeric decimal.Decimal 定点数
String str 字符串
Text str 长文本
Unicode unicode Unicode字符串
UnicodeText unicode 长Unicode字符串
Boolean bool 布尔值
Date datetime.date 日期
Time datetime.time 时间
Datetime datetime.datetime 日期时间
Interval datetime.timedelta 时间间隔
Enum str 一组字符串
PickleType 任何Python对象 使用Pickle序列化
LargeBinary str 二进制blob

SQLAlchemy常用列约束

约束名 说明
primary_key True代表主键
unique True代表值唯一
index True代表为列添加索引
nullable True代表允许使用空值
default 列的默认值

<模型类创建案例>

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 相关配置
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:mysql@192.168.44.128:3306/test39"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

# 创建组件对象
db = SQLAlchemy(app)


# 构建模型类  类->表  类属性->字段  实例对象->记录
class User(db.Model):
    __tablename__ = 't_user'  # 设置表名, 表名默认为类名小写
    id = db.Column(db.Integer, primary_key=True)  # 设置主键, 默认自增
    name = db.Column('username', db.String(20), unique=True)  # 设置字段名 和 唯一约束
    age = db.Column(db.Integer, default=10, index=True)  # 设置默认值约束 和 索引


if __name__ == '__main__':
    # 删除所有继承自db.Model的表
    db.drop_all()
    # 创建所有继承自db.Model的表
    db.create_all()
    app.run(host="0.0.0.0", port=8000, debug=True)

3.2 、外键

外键是一种数据库约束,用于确保表之间的关系完整性。在Flask中,您可以使用db.relationship来定义外键关系。

假设我们有一个Post模型,它与User模型存在外键关系,每个帖子都是由某个用户创建的。我们可以使用db.relationship来定义这种关系:

class Post(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(80))
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'))
    user = db.relationship('User', backref='posts')

在这个例子中,我们定义了一个user_id列,它是User模型的外键。我们还定义了一个user属性,它是一个User对象,并使用backref参数将其与User模型中的posts属性关联起来。

3.3、 一对多关系

一对多关系是一种模型关系,其中一个模型可以有多个关联模型。在Flask中,您可以使用外键和db.relationship来定义一对多关系。

假设我们有一个Category模型,每个类别可以有多篇文章。我们可以使用外键来定义这种关系:

class Category(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80))

class Article(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(80))
    body = db.Column(db.Text)
    category_id = db.Column(db.Integer, db.ForeignKey('category.id'))
    category = db.relationship('Category', backref='articles')

在这个例子中,我们定义了一个category_id列,它是Category模型的外键。我们还定义了一个category属性,它是一个Category对象,并使用backref参数将其与Category模型中的articles属性关联起来。这样,我们可以通过Category对象访问它的所有文章。

3.4、多对多关系

多对多关系是一种模型关系,其中一个模型可以与多个其他模型相关联,并且这些模型也可以与其他模型相关联。在Flask中,您可以使用db.relationship和关联表来定义多对多关系。

假设我们有一个Tag模型和一个Article模型,每篇文章可以有多个标签,每个标签也可以与多篇文章相关联。我们需要创建一个关联表来存储这些关系:

tags = db.Table('tags',
    db.Column('tag_id', db.Integer, db.ForeignKey('tag.id')),
    db.Column('article_id', db.Integer, db.ForeignKey('article.id'))
)

class Tag(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(80))

class Article(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(80))
    body = db.Column(db.Text)
    tags = db.relationship('Tag', secondary=tags,
        backref=db.backref('articles', lazy='dynamic'))

在这个例子中,我们定义了一个tags表来存储文章和标签之间的关系。我们还定义了Tag和Article模型,并使用db.relationship来定义它们之间的多对多关系。secondary参数指定了关联表,backref参数指定了Tag模型中的articles属性,并使用lazy参数来指定加载模式。

这些示例演示了如何在Flask中使用SQLAlchemy来定义模型之间的关系。通过使用ORM层,我们可以轻松地创建和管理数据库应用程序,而无需编写任何SQL语句。

二、数据操作

1、增加数据

  1. 给模型对象设置数据 可以通过 初始化参数 或者 赋值属性 两种方式
  2. session.add(模型对象) 添加单条数据到会话中, session.add_all(列表) 添加多条数据到会话中
  3. sqlalchemy 会 自动创建事务, 并将数据操作包含在事务中, 提交会话时就会提交事务,事务提交失败会自动回滚。
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 相关配置
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:mysql@192.168.44.128:3306/test39"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

# 创建组件对象
db = SQLAlchemy(app)


# 构建模型类
class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column('username', db.String(20), unique=True)
    age = db.Column(db.Integer, index=True)


@app.route('/')
def index():
    """增加数据"""

    # 1.创建模型对象
    user1 = User(name='zs', age=20)
    # user1.name = 'zs'
    # user1.age = 20

    # 2.将模型对象添加到会话中
    db.session.add(user1)
    # 添加多条记录
    # db.session.add_all([user1, user2, user3])

    # 3.提交会话 (会提交事务)
    # sqlalchemy会自动创建隐式事务
    # 事务失败会自动回滚
    db.session.commit()

    return "index"


if __name__ == '__main__':
    db.drop_all()
    db.create_all()
    app.run(host="0.0.0.0", port=8000, debug=True)

2、查询数据

  1. 数据的准备工作
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 配置数据库连接
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:mysql@192.168.44.128:3306/test39"
# 配置取消数据库跟踪
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# 配置数据库输出SQL语句
app.config['SQLALCHEMY_ECHO'] = True
# 创建数据库对象
db = SQLAlchemy(app)

class User(db.Model):
    # 指定表名:默认使用类名小写
    __tablename__ = "users"
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(64))
    email = db.Column(db.String(64))
    age = db.Column(db.Integer)

    def __repr__(self):
        return "(%s, %s, %s, %s)"%(self.id, self.name, self.email, self.age)


if __name__ == '__main__':
    # 删除所有表
    # db.drop_all()
    # # 创建所有表
    # db.create_all()
    # # 添加测试数据
    # user1 = User(name='wang', email='wang@163.com', age=20)
    # user2 = User(name='zhang', email='zhang@189.com', age=33)
    # user3 = User(name='chen', email='chen@126.com', age=23)
    # user4 = User(name='zhou', email='zhou@163.com', age=29)
    # user5 = User(name='tang', email='tang@itheima.com', age=25)
    # user6 = User(name='wu', email='wu@gmail.com', age=25)
    # user7 = User(name='qian', email='qian@gmail.com', age=23)
    # user8 = User(name='liu', email='liu@itheima.com', age=30)
    # user9 = User(name='li', email='li@163.com', age=28)
    # user10 = User(name='sun', email='sun@163.com', age=26)
    #
    # # 一次添加多条数据
    # db.session.add_all([user1, user2, user3, user4, user5, user6, user7, user8, user9, user10])
    # db.session.commit()
    app.run(host="0.0.0.0",port=8000, debug=True)

2、进行查询操作

# 1:查询所有的用户数据:
    users = User.query.all()
    print(type(users))
    print(users)

    # 2:查询一共有多少个用户:
    count = User.query.count()
    print("一共有{}个人".format(count))

    # 3:查询第一个用户信息:
    user1 = User.query.first()
    print("第一个用户的信息是:{}".format(user1))

    # 4:查询id为4的三种方式:
    #<方案一>:根据id查询,返回模型类对象
    user4 = User.query.get(4)
    print("第四个用户的信息是{}".format(user4))

    # <方案二>:等值过滤器 关键字实参设置字段值  返回BaseQuery对象
    user4 = User.query.filter_by(id=4).first()
    print("第四个用户的信息是{}".format(user4))

    # <方案三>:使用复杂过滤器,返回BaseQuery对象
    user4 = User.query.filter(User.id == 4).first()
    print("第四个用户的信息是{}".format(user4))

    # 5:查询用户名字,开始,结尾,包含n的用户
    user = User.query.filter(User.name.startswith('n')).all()
    print("名字以n开头的用户{}".format(user))
    user = User.query.filter(User.name.endswith("n")).all()
    print("名字以n结尾的用户{}".format(user))
    user = User.query.filter(User.name.contains("n")).all()
    print("名字中包含n的用户:{}".format(user))
    # 6:查询名字和邮箱都以li开头的所有用户[2种方式]
    users = User.query.filter(User.name.startswith('li'), User.email.startswith('li')).all()
    print("查询名字和邮箱都以li开头的所有用户:{}".format(users))

    users = User.query.filter(and_(User.name.startswith('li'), User.email.startswith('li'))).all()
    print("查询名字和邮箱都以li开头的所有用户:{}".format(users))

    # 7:查询age是25或者email以com结尾的所有用户
    users = User.query.filter(or_(User.age==25, User.email.endswith('com'))).all()
    print("age是25或者email以com结尾的所有用户 : {}".format(users))

    # 8: 查询名字不等于wang的所有用户
    users = User.query.filter(User.name != "wang").all()
    print("名字不等于wang的所有用户: {}".format(users))
    users= User.query.filter(not_(User.name=="wang")).all()
    print("名字不等于wang的所有用户: {}".format(users))

    # 9: 查询id是[1, 3, 5, 7, 9]的用户
    users = User.query.filter(User.id.in_([1, 3, 5, 7, 9])).all()
    print("id是[1, 3, 5, 7, 9]的用户: {}".format(users))

    # 10:所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个
    users = User.query.order_by(User.age, User.id.desc()).limit(5).all()
    print("所有用户先按年龄从小到大, 再按id从大到小排序, 取前5个: {}".format(users))
# 11:查询年龄从小到大第2-5位的数据
    users = User.query.order_by(User.age).offset(1).limit(4).all()
    print("查询年龄从小到大第2-5位的数据: {}".format(users))

    # 12: 分页查询, 每页3个, 查询第2页的数据  paginate(页码, 每页条数)
    pn = User.query.paginate(2, 3)
    print("总页数是:", pn.pages)
    print("当前页:", pn.page)
    print("当前页的数据:", pn.items)
    print("当前页的总条数", pn.total)

    # 13: 查询每个年龄段的人数:(分组聚合)
    data = db.session.query(User.age, func.count(User.id)).group_by(User.age).all()
    for item in data:
        print(item[0], item[1])
    # 注意可以给列起别名,但是windows下会报错,linux下不会报错。
    # data = db.session.query(User.age, func.count(User.id).label("count")).group_by(User.age).all()
    # for item in data:
    #     # print(item[0], item[1])
    #     print(item.age, item.count)  # 建议通过label()方法给字段起别名, 以属性方式获取数据

    # 14:只查询所有人的姓名和邮箱,这种相当于全表查询,效率非常低。
    data = db.session.query(User.name, User.email).all()
    for item in data:
        print(item.name, item.email)

    # 15:优化查询
    data = User.query.options(load_only(User.name, User.email)).all()
    for item in data:
        print(item.name, item.email)
    return "index"

3、修改数据

  1. 方案一:先查询再更新
from flask import Flask
from flask_sqlalchemy import SQLAlchemy

app = Flask(__name__)

# 相关配置
app.config['SQLALCHEMY_DATABASE_URI'] = "mysql://root:mysql@192.168.44.128:3306/test39"
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True

# 创建组件对象
db = SQLAlchemy(app)


# 构建模型类  商品表
class Goods(db.Model):
    __tablename__ = 't_good'  # 设置表名
    id = db.Column(db.Integer, primary_key=True)  # 设置主键
    name = db.Column(db.String(20), unique=True)  # 商品名称
    count = db.Column(db.Integer)  # 剩余数量


@app.route('/')
def purchase():
    """购买商品"""

    # 更新方式1: 先查询后更新
    # 缺点: 并发情况下, 容易出现更新丢失问题 (Lost Update)
    # 1.执行查询语句, 获取目标模型对象
    goods = Goods.query.filter(Goods.name == '方便面').first()
    # 2.对模型对象的属性进行赋值 (更新数据)
    goods.count = goods.count - 1
    # 3.提交会话
    db.session.commit()

    return "index"
  1. 方案二:配合查询过滤器filter() 和 更新执行器update() 进行数据更新
Goods.query.filter(Goods.name == '方便面').update({'count': Goods.count - 1})
db.session.commit()

3.总结
推荐采用方案二,因为一条语句, 被网络IO影响程度低, 执行效率更高,查询和更新在一条语句中完成, 单条SQL具有原子性, 不会出现更新丢失问题,会对满足过滤条件的所有记录进行更新, 可以实现批量更新处理。

4、删除数据

方案一:

 # 方式1: 先查后删除
    goods = Goods.query.filter(Goods.name == '方便面').first()
    # 删除数据
    db.session.delete(goods)
    # 提交会话 增删改都要提交会话
    db.session.commit()

方案二:

# 方式2: delete子查询
    Goods.query.filter(Goods.name == '方便面').delete()
    # 提交会话
    db.session.commit()

三、高级机制

1、刷新数据

  1. Session 被设计为数据操作的执行者, 会先将操作产生的数据保存到内存中。
  2. 在执行 flush刷新操作 后, 数据操作才会同步到数据库中。
  3. 隐式刷新操作:1:提交会话 2:查询操作(包括更新和删除中的子查询)。
  4. 手动刷新:session.flush()

刷新机制的理解:
答:刷新机制就是通过事务,将SQl语句执行一遍,然后将执行结果存储在变量中,但是数据库做回滚操作。导致变量中有了新值,但是数据库却没有改变。

goods = Goods(name='方便面', count=20)
db.session.add(goods)
# 主动执行flush操作, 立即执行SQL操作(数据库同步)
print(goods.id)  # 此时是None
db.session.flush()
print(goods.id) # 此时是1
# Goods.query.count()  # 查询操作会自动执行flush操作
db.session.commit()  # 提交会话会自动执行flush操作

2、多表查询

外键关联查询:

生成主表对象后,必须刷新数据库,否则后面无法使用主表对象的属性。

1:主从表的定义:

# 用户表  一   一个用户可以有多个地址
class User(db.Model):
    __tablename__ = 't_user'
    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(20))


# 地址表   多
class Address(db.Model):
    __tablename__ = 't_adr'
    id = db.Column(db.Integer, primary_key=True)
    detail = db.Column(db.String(20))
    user_id = db.Column(db.Integer)  # 定义外键

2:添加关联数据:

def index():
    """添加并关联数据"""

    user1 = User(name='张三')
    db.session.add(user1)
    db.session.flush()  # 必须刷新,不然后面的user1.id是None
    adr1 = Address(detail='中关村3号', user_id=user1.id)
    adr2 = Address(detail='华强北5号', user_id=user1.id)
    db.session.add_all([adr1, adr2])
    db.session.commit()
    return "index"
3:关联查询:

# 1.先根据姓名查找到主表主键
    user1 = User.query.filter_by(name='张三').first()

    # 2.再根据主键到从表查询关联地址
    adrs = Address.query.filter_by(user_id=user1.id).all()
    for adr in adrs:
        print(adr.detail)

Logo

有“AI”的1024 = 2048,欢迎大家加入2048 AI社区

更多推荐