attention

Use session.query instead of model.query, which requires database data to be mapped to the model

Session.add () adds the SQL statement to pending. Using session.query again forces the EXECUTION of pending SQL and then the query. Model.query does not force the execution of pending SQL. (The specific principles and implementation methods need to be studied.)

session

Start by creating a DB object that has all the wrapper around a database in Flask_SQLAlchemy

from flask import Flask
from flask_sqlalchemy imort SQLAlchemy


app = Flask(__name__)
db = SQLAlchemy(app, engine_options={'pool_recycle': 3600.'pool_size': 20})
# session = db.session
Copy the code

Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) PIP install mysql-python, but python3 is no longer supported. ModuleNotFoundError will be raised if python3 is installed: No module named ‘ConfigParser’ error because PYTHon3 changed the configParser. py file to configparser.py.

from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
# database + engine :// user: password @ address: port/library? charset=utf8
mysql_uri = "Mysql + pymysql: / / root: [email protected]:3306 / SQLAlchemyTest? charset=utf8"

# paramstyle in ["qmark", "numeric", "named", "format" or "pyformat"]
engine = create_engine(
    mysql_uri,
    paramstyle=None,
    echo=True,
    echo_pool=True,
    pool_pre_ping=True.In pessimistic mode, execute the select statement to ping each time, and retry after the execution fails
)
session_caller = sessionmaker(bind=engine, autoflush=True, autocommit=False, expire_on_commit=True, info=None)
session = session_caller()
# session = scoped_session(session_factory=Session_caller, scopefunc=None)
Copy the code

The create_engine parameter paramstyle is the default Python parameter style type, as described in PEP249(Paramstyle)

Pymysql is used as the sqlAlchemy engine, but the connectionless case will run into sqlAlchemy and Pymysql in paramstyle. Pymysql specified the pyformat parameter in __init__.py. So if you want to specify paramstyle, you should specify None or PyFormat, otherwise it will cause an error.

This operation works properly in Connection mode.

If no transaction is involved, the Connection mode is recommended. The connection is created with engine.connect() and the SQL is executed

The use of named

# use named
engine = create_engine(mysql_uri, paramstyle='named')
session = sessionmaker(engine)()
sql = 'select * from dev_user where id={}'
session.execute(sql.format(':user_id'), {'user_id': 100}).fetchone()
# sqlalchemy did not format :user_id, but passed it directly to Pymysql, causing an error
# ProgrammingError: (pymysql.err.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ':user_id' at line 1")
# [SQL: select * from dev_user where id=:user_id]
# [parameters: {'user_id': 100}]
Copy the code

Using pyformat

# use pyformat
engine.paramstyle = 'pyformat'
session.execute(sql.format('%(user_id)s'), {'user_id': 100}).fetchone()
# sqlalchemy failed to retrieve params parameter
# ProgrammingError: (pymysql.err.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '%(user_id)s' at line 1)"
# [SQL: select * from dev_user where id=%%(user_id)s]

Using named style parameters is fine
session.execute(sql.format(':user_id'), {'user_id': 100}).fetchone()
# SQL:
# select * from dev_user where id=%(user_id)s
# {'user_id': 101}
Copy the code

Sqlalchemy can be used in three ways:

  • raw sql
  • sql expression
  • ORM

If sqlalchemy+ CX_oracle is used, you need to disable connection pool. Otherwise, exceptions will occur. Set sqlalchemy. poolClass to sqlalchemy.pool.nullPool

Support for transactions

Create a connection to start a transaction. Savepoints can be created for data that is added but not committed

db.session.add(u1)
Create a Save point
db.session.begin_nested()

db.session.add(u2)
db.session.rollback()
Rollback only U2, not U1
db.session.commit()
Copy the code

Here is the database model demo

class User(db.Model):
    __tablename__ = "dev_user"
    id = db.Column(INTEGER(unsigned=True), primary_key=True)
    username = db.Column(db.String(80), unique=True, doc="Username", comment="User id")
    email = db.Column(db.String(120), unique=True, doc="User email", comment="User email")
    is_active = db.Column(db.Boolean, default=False, doc="Active", comment="Is it active?")
    phone = db.Column(db.String(20), index=True, doc="User's Mobile phone Number", comment="User's Mobile phone Number")
    age = db.Column(INTEGER(), doc="User age", comment="User age")

    def __repr__(self):
        return f"<User {self.username! r}>"


class Blog(db.Model):
    __tablename__ = "blog"
    id = db.Column(INTEGER(unsigned=True), primary_key=True)
    content = db.Column(db.Text, comment="Blog content")
    user_id = db.Column(INTEGER(unsigned=True), db.ForeignKey("dev_user.id"), index=True,
                        doc="User id", comment="User id")


class Comment(db.Model):
    __tablename__ = "comment"
    id = db.Column(INTEGER(unsigned=True), primary_key=True)
    comment = db.Column(db.String(150), doc="User Reviews", comment="User Reviews")
    user_id = db.Column(INTEGER(unsigned=True), index=True, doc"User id", comment="User id")


class UserInfo(db.Model):
    __tablename__ = "user_info"
    id = db.Column(INTEGER(unsigned=True), primary_key=True)
    user_id = db.Column(INTEGER(unsigned=True), db.ForeignKey("dev_user.id"),
												index=True, doc="User id", comment="User id",)
    user = db.relationship("User", backref="user_info", uselist=False)
    company = db.Column(db.String(100), unique=True, doc="Company abbreviation", comment="Company abbreviation")
Copy the code

Multiple database support

Flask_sqlalchemy uses the concept of bind to bind multiple databases

This is available in the config class
class Config(object):
    SQLALCHEMY_DATABASE_URI = 'postgres://localhost/main'
    SQLALCHEMY_BINDS = {
    		'users':        'mysqldb://localhost/users'.'appmeta':      'sqlite:////path/to/appmeta.db'
		}
# available in the Settings of the app.config property
app.config["SQLALCHEMY_DATABASE_URI"] = 'postgres://localhost/main'
app.config["SQLALCHEMY_BINDS"] = {
    'users':        'mysqldb://localhost/users'.'appmeta':      'sqlite:////path/to/appmeta.db'
}
Copy the code

In Flask_SQLAlchemy, there are many packages that encapsulate bind parameters in which you can specify one of multiple database Settings, for example

db.session.execute(sql, bind='user')
Copy the code

The differences in SQL are as follows:

Bind SELECT user.username AS username_1 FROM user WHERE user.id = 100; Bind SELECT test.user.username AS username_1 FROM test.user WHERE test.user.id = 100; -- Table names are prefixed with database namesCopy the code

Use multiple database support in database definitions

class User(db.Model):
    __tablename__ = "user"
    # specify '__bind_key__' directly selects the specified database when executing SQL statements
    __bind_key__ = 'users'
    Or use __table_args__
    __table_args__ = {
        'schema': "users".# specified bind
        'mysql_engine': 'InnoDB'.'mysql_charset': 'utf8mb4',}# __table_args__ can also use tuples
    # __table_args__ = (
    # db.UniqueConstraint("username", "id", name="uix_username_id"),
    # db.Index("ix_username", "username")
    #)
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True)
    # SqlAlchemy's polymorphic nature
    __mapper_args__ = {
        "order_by": username.desc()
    }

Or create a table model by creating objects
user_favorites = db.Table('user_favorites',
    db.Column('user_id', db.Integer, db.ForeignKey('user.id')),
    db.Column('message_id', db.Integer, db.ForeignKey('message.id')),
    info={'bind_key': 'users'})Copy the code

Basic Usage (CRUD)

A simple query

  1. add

Use db.session to add data

user = User(username='Jack', email='[email protected]')
db.session.add(user)
db.session.commit()  # manually commit
user = User(username='Vic', email='[email protected]')
info = UserInfo(user=user, company='Google')
# add multiple objects
db.session.add_all([user, info])
db,session.commit()
Copy the code
  1. delete

The delete method of a Query object

The update and delete methods accept an optional parameter synchronize_session=evaluate. Synchronize_session has three values:

  • False: Updates database data, but does not update the data queried in the session
  • fetch: Updates database data and reads data from the database again
  • evaluateIf the data in the session is inconsistent with the data in the database, the session cannot determine what to do. This parameter is thrownInvalidRequestErrorabnormal
rows = db.session.query(User).filter(User.id == 101).delete(synchronize_session='evaluate')
Copy the code

If you set cascade=’all,delete-orphan’, the data from the relationship table will be deleted. If you set cascade=’all,delete-orphan’, the data from the relationship table will be deleted.

session.delete(instance)

db.session.query(User).filter(User.id == 101).delete()
# IntegrityError: (pymysql.err.IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`test`.`user_info`, CONSTRAINT `user_info_ibfk_1` FOREIGN KEY (`user_id`) REFERENCES `dev_user` (`id`))')

instance = db.session.query(User).filter(User.id == 101).scalar()
db.session.delete(instance)
Delete successfully returns 1
Copy the code
  1. Modify the

There are two ways to modify a property and submit it, or to modify the value of the query set

# Modify object properties
user = db.session.query(User).filter(User.id == 102).first()
user.username = 'Thomas'
db.session.add(user)
db.session.commit()
Change the value of Query set Query
user_query = db.sessin.query(User.id < 100).update({'is_activate': 0})
db.session.commit()
Copy the code
  1. The query

filter_by

Filter_by returns a query object if the value of the filter_by query is either = or AND

# Scalar will only return one, and will throw a MultipleResultsFound exception if there are none or multiple returns
db.session.query(User).filter_by(id=101).scalar()
Select * from User where id = 101
db.session.query(User, UserInfo).filter_by(id=101).all()
Copy the code

filter

The filter accepts different types of parameters. Multiple filter criteria can be used to return query objects

The primary key ID can be queried directly with GET

db.session.query(User).get(100)
As with Django, only one result is returned, multiple results are thrown, and None is returned
Copy the code

Unlike Django’s ORM, SQLalchemy places conditions on the properties of the Model field, such as order_BY, and AS

from sqlalchemy import desc, asc
query.order_by(desc(User.owned))
db.session.query(User.id).filter().order_by(asc(User.id)).offset(10).limit(10).all()
# SQL:
# SELECT dev_user.id AS dev_user_id
# FROM dev_user ORDER BY dev_user.id DESC
# LIMIT %(param_1)s, %(param_2)s
# {'param_1': 10, 'param_2': 10}
db.session.query(User.username.label('name')).filter()
# alias username as name
# SQL:
# SELECT dev_user.username as name
# FROM dev_user
Copy the code

First, some methods for getting data from query, all(), first(), one(), Scalar (), get(), one_or_none()

  • All: Returns all data

  • First: Take the first row in all rows, or None if there is no result. No exception is thrown

  • One: The returned rows must have only one row. Exceptions will be thrown if there are multiple rows or no data

  • One_or_none: The returned rows must be one row or have no data. Exceptions will be thrown if there are more than one row of data

  • Scalar: The returned Rows must be one or no data, with multiple rows will throw an exception, and Scalar will only take the first result set in Result

=, <, > query

# from sqlalchemy import true
db.session.query(User.id, User.username).filter(User.is_activate==db.true())

SQL > select * from user;
db.session.query(User.username, UserInfo.company).filter(User.id==103, User.username=='Zachary Moreno').all()
# Combined queries can also use and_, which translates to the same SQL
from sqlalchemy import and_, or_
db.session.query(User.username, UserInfo.company).filter(and_(User.id==103, User.username=='Zachary Moreno')).all()
SQL > translate into
# SELECT dev_user.username AS dev_user_username, user_info.company AS user_info_company
# FROM dev_user, user_info
# WHERE dev_user.id = %(id_1)s AND dev_user.username = %(username_1)s
# params: {'id_1': 103, 'username_1': 'Zachary Moreno'}

db.session.query(User).filter(User.id < 20).all()
db.session.query(User).filter(User.create_at > '2020-02-02 00:00:00')
Copy the code

Fuzzy query

db.session.query(User.username).filter(User.email.startswith('123')).all()
db.session.query(User.username).filter(User.email.like('123%'))
# SELECT dev_user.username from dev_user where dev_user.email like '123%'
db.session.query(User.username).filter(User.email.endswith('123')).all()
Copy the code

join

Join joins tables

db.session.query(Comment.comment.label('user_comment'), User.id.label('user_id'), User.username.label('username')).join(User, User.id==Comment.user_id).filter(User.id<100).order_by(User.id.desc()).all()  # desc is the method of the field
# SQL:
# SELECT comment.comment AS user_comment, dev_user.id AS user_id, dev_user.username AS username
# FROM comment INNER JOIN dev_user ON dev_user.id = comment.user_id
# WHERE dev_user.id = %(id_1)s
# params: {'id_1': 100}
Copy the code

You can use the alias of SQLAlchemy for multiple joins

from sqlalchemy.orm import aliased
Comment_alias = aliased(Comment, name='comment_alias')
db.session.query(User.id, User.username, User.email, User.phone, Comment.comment).join(Comment, Comment.user_id == User.id).join(Comment_alias, Comment_alias.user_id == User.id).all()
# SQL:
# SELECT dev_user.id AS dev_user_id, dev_user.username AS dev_user_username, dev_user.email AS dev_user_email, dev_user.phone AS dev_user_phone, comment.comment 
# FROM dev_user INNER JOIN comment ON comment.user_id = dev_user.id INNER JOIN comment AS comment_alias ON comment_alias.user_id = dev_user.id
Copy the code

offset/limit

from sqlalchemy import true

db.session.query(User.username, User.phone, User.email).filter(User.is_active == true()).order(User.age.desc()).offset(10).limit(20)
# SQL:
# SELECT dev_user.username AS dev_user_username, dev_user.phone AS dev_user_phone, dev_user.email AS dev_user_email
# FROM dev_user
# WHERE dev_user.is_active = true ORDER BY dev_user.age DESC
# LIMIT %(param_1)s, %(param_2)s
# {'param_1': 10, 'param_2': 20}

Copy the code

in/not in/is null/is not null

# in
db.session.query(User).filter(User.id.in_([100.103.105]))
# not in
db.session.query(User).filter(~User.id.in_([100.103.105]))
# is null
from sqlalchemy import null
db.session.query(User).filter(User.email == null())
# is not nulldb.session.query(User).filter(User.email ! = null())Copy the code

select_from

Select * from select_FROM; select * from query; select * from join

db.session.query(User).select_from(Comment).join(User, Comment.user_id == User.id).all()
Get only the data in User, that is, the model in query
# SELECT dev_user.id AS dev_user_id, dev_user.username AS dev_user_username, dev_user.email AS dev_user_email, dev_user.is_active AS dev_user_is_active, dev_user.phone AS dev_user_phone, dev_user.age AS dev_user_age
# FROM comment INNER JOIN dev_user ON dev_user.id = comment.user_id
Copy the code