attention
Use session.query instead of model.query, which requires database data to be mapped to the model
Session.add () adds the SQL statement to pending. Using session.query again forces the EXECUTION of pending SQL and then the query. Model.query does not force the execution of pending SQL. (The specific principles and implementation methods need to be studied.)
session
Start by creating a DB object that has all the wrapper around a database in Flask_SQLAlchemy
from flask import Flask
from flask_sqlalchemy imort SQLAlchemy
app = Flask(__name__)
db = SQLAlchemy(app, engine_options={'pool_recycle': 3600.'pool_size': 20})
# session = db.session
Copy the code
Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) : Flask_sqlalchemy (Flask_sqlalchemy) PIP install mysql-python, but python3 is no longer supported. ModuleNotFoundError will be raised if python3 is installed: No module named ‘ConfigParser’ error because PYTHon3 changed the configParser. py file to configparser.py.
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
# database + engine :// user: password @ address: port/library? charset=utf8
mysql_uri = "Mysql + pymysql: / / root: [email protected]:3306 / SQLAlchemyTest? charset=utf8"
# paramstyle in ["qmark", "numeric", "named", "format" or "pyformat"]
engine = create_engine(
mysql_uri,
paramstyle=None,
echo=True,
echo_pool=True,
pool_pre_ping=True.In pessimistic mode, execute the select statement to ping each time, and retry after the execution fails
)
session_caller = sessionmaker(bind=engine, autoflush=True, autocommit=False, expire_on_commit=True, info=None)
session = session_caller()
# session = scoped_session(session_factory=Session_caller, scopefunc=None)
Copy the code
The create_engine parameter paramstyle is the default Python parameter style type, as described in PEP249(Paramstyle)
Pymysql is used as the sqlAlchemy engine, but the connectionless case will run into sqlAlchemy and Pymysql in paramstyle. Pymysql specified the pyformat parameter in __init__.py. So if you want to specify paramstyle, you should specify None or PyFormat, otherwise it will cause an error.
This operation works properly in Connection mode.
If no transaction is involved, the Connection mode is recommended. The connection is created with engine.connect() and the SQL is executed
The use of named
# use named
engine = create_engine(mysql_uri, paramstyle='named')
session = sessionmaker(engine)()
sql = 'select * from dev_user where id={}'
session.execute(sql.format(':user_id'), {'user_id': 100}).fetchone()
# sqlalchemy did not format :user_id, but passed it directly to Pymysql, causing an error
# ProgrammingError: (pymysql.err.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near ':user_id' at line 1")
# [SQL: select * from dev_user where id=:user_id]
# [parameters: {'user_id': 100}]
Copy the code
Using pyformat
# use pyformat
engine.paramstyle = 'pyformat'
session.execute(sql.format('%(user_id)s'), {'user_id': 100}).fetchone()
# sqlalchemy failed to retrieve params parameter
# ProgrammingError: (pymysql.err.ProgrammingError) (1064, "You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near '%(user_id)s' at line 1)"
# [SQL: select * from dev_user where id=%%(user_id)s]
Using named style parameters is fine
session.execute(sql.format(':user_id'), {'user_id': 100}).fetchone()
# SQL:
# select * from dev_user where id=%(user_id)s
# {'user_id': 101}
Copy the code
Sqlalchemy can be used in three ways:
- raw sql
- sql expression
- ORM
If sqlalchemy+ CX_oracle is used, you need to disable connection pool. Otherwise, exceptions will occur. Set sqlalchemy. poolClass to sqlalchemy.pool.nullPool
Support for transactions
Create a connection to start a transaction. Savepoints can be created for data that is added but not committed
db.session.add(u1)
Create a Save point
db.session.begin_nested()
db.session.add(u2)
db.session.rollback()
Rollback only U2, not U1
db.session.commit()
Copy the code
Here is the database model demo
class User(db.Model):
__tablename__ = "dev_user"
id = db.Column(INTEGER(unsigned=True), primary_key=True)
username = db.Column(db.String(80), unique=True, doc="Username", comment="User id")
email = db.Column(db.String(120), unique=True, doc="User email", comment="User email")
is_active = db.Column(db.Boolean, default=False, doc="Active", comment="Is it active?")
phone = db.Column(db.String(20), index=True, doc="User's Mobile phone Number", comment="User's Mobile phone Number")
age = db.Column(INTEGER(), doc="User age", comment="User age")
def __repr__(self):
return f"<User {self.username! r}>"
class Blog(db.Model):
__tablename__ = "blog"
id = db.Column(INTEGER(unsigned=True), primary_key=True)
content = db.Column(db.Text, comment="Blog content")
user_id = db.Column(INTEGER(unsigned=True), db.ForeignKey("dev_user.id"), index=True,
doc="User id", comment="User id")
class Comment(db.Model):
__tablename__ = "comment"
id = db.Column(INTEGER(unsigned=True), primary_key=True)
comment = db.Column(db.String(150), doc="User Reviews", comment="User Reviews")
user_id = db.Column(INTEGER(unsigned=True), index=True, doc"User id", comment="User id")
class UserInfo(db.Model):
__tablename__ = "user_info"
id = db.Column(INTEGER(unsigned=True), primary_key=True)
user_id = db.Column(INTEGER(unsigned=True), db.ForeignKey("dev_user.id"),
index=True, doc="User id", comment="User id",)
user = db.relationship("User", backref="user_info", uselist=False)
company = db.Column(db.String(100), unique=True, doc="Company abbreviation", comment="Company abbreviation")
Copy the code
Multiple database support
Flask_sqlalchemy uses the concept of bind to bind multiple databases
This is available in the config class
class Config(object):
SQLALCHEMY_DATABASE_URI = 'postgres://localhost/main'
SQLALCHEMY_BINDS = {
'users': 'mysqldb://localhost/users'.'appmeta': 'sqlite:////path/to/appmeta.db'
}
# available in the Settings of the app.config property
app.config["SQLALCHEMY_DATABASE_URI"] = 'postgres://localhost/main'
app.config["SQLALCHEMY_BINDS"] = {
'users': 'mysqldb://localhost/users'.'appmeta': 'sqlite:////path/to/appmeta.db'
}
Copy the code
In Flask_SQLAlchemy, there are many packages that encapsulate bind parameters in which you can specify one of multiple database Settings, for example
db.session.execute(sql, bind='user')
Copy the code
The differences in SQL are as follows:
Bind SELECT user.username AS username_1 FROM user WHERE user.id = 100; Bind SELECT test.user.username AS username_1 FROM test.user WHERE test.user.id = 100; -- Table names are prefixed with database namesCopy the code
Use multiple database support in database definitions
class User(db.Model):
__tablename__ = "user"
# specify '__bind_key__' directly selects the specified database when executing SQL statements
__bind_key__ = 'users'
Or use __table_args__
__table_args__ = {
'schema': "users".# specified bind
'mysql_engine': 'InnoDB'.'mysql_charset': 'utf8mb4',}# __table_args__ can also use tuples
# __table_args__ = (
# db.UniqueConstraint("username", "id", name="uix_username_id"),
# db.Index("ix_username", "username")
#)
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True)
# SqlAlchemy's polymorphic nature
__mapper_args__ = {
"order_by": username.desc()
}
Or create a table model by creating objects
user_favorites = db.Table('user_favorites',
db.Column('user_id', db.Integer, db.ForeignKey('user.id')),
db.Column('message_id', db.Integer, db.ForeignKey('message.id')),
info={'bind_key': 'users'})Copy the code
Basic Usage (CRUD)
A simple query
- add
Use db.session to add data
user = User(username='Jack', email='[email protected]')
db.session.add(user)
db.session.commit() # manually commit
user = User(username='Vic', email='[email protected]')
info = UserInfo(user=user, company='Google')
# add multiple objects
db.session.add_all([user, info])
db,session.commit()
Copy the code
- delete
The delete method of a Query object
The update and delete methods accept an optional parameter synchronize_session=evaluate. Synchronize_session has three values:
False
: Updates database data, but does not update the data queried in the sessionfetch
: Updates database data and reads data from the database againevaluate
If the data in the session is inconsistent with the data in the database, the session cannot determine what to do. This parameter is thrownInvalidRequestError
abnormal
rows = db.session.query(User).filter(User.id == 101).delete(synchronize_session='evaluate')
Copy the code
If you set cascade=’all,delete-orphan’, the data from the relationship table will be deleted. If you set cascade=’all,delete-orphan’, the data from the relationship table will be deleted.
session.delete(instance)
db.session.query(User).filter(User.id == 101).delete()
# IntegrityError: (pymysql.err.IntegrityError) (1451, 'Cannot delete or update a parent row: a foreign key constraint fails (`test`.`user_info`, CONSTRAINT `user_info_ibfk_1` FOREIGN KEY (`user_id`) REFERENCES `dev_user` (`id`))')
instance = db.session.query(User).filter(User.id == 101).scalar()
db.session.delete(instance)
Delete successfully returns 1
Copy the code
- Modify the
There are two ways to modify a property and submit it, or to modify the value of the query set
# Modify object properties
user = db.session.query(User).filter(User.id == 102).first()
user.username = 'Thomas'
db.session.add(user)
db.session.commit()
Change the value of Query set Query
user_query = db.sessin.query(User.id < 100).update({'is_activate': 0})
db.session.commit()
Copy the code
- The query
filter_by
Filter_by returns a query object if the value of the filter_by query is either = or AND
# Scalar will only return one, and will throw a MultipleResultsFound exception if there are none or multiple returns
db.session.query(User).filter_by(id=101).scalar()
Select * from User where id = 101
db.session.query(User, UserInfo).filter_by(id=101).all()
Copy the code
filter
The filter accepts different types of parameters. Multiple filter criteria can be used to return query objects
The primary key ID can be queried directly with GET
db.session.query(User).get(100)
As with Django, only one result is returned, multiple results are thrown, and None is returned
Copy the code
Unlike Django’s ORM, SQLalchemy places conditions on the properties of the Model field, such as order_BY, and AS
from sqlalchemy import desc, asc
query.order_by(desc(User.owned))
db.session.query(User.id).filter().order_by(asc(User.id)).offset(10).limit(10).all()
# SQL:
# SELECT dev_user.id AS dev_user_id
# FROM dev_user ORDER BY dev_user.id DESC
# LIMIT %(param_1)s, %(param_2)s
# {'param_1': 10, 'param_2': 10}
db.session.query(User.username.label('name')).filter()
# alias username as name
# SQL:
# SELECT dev_user.username as name
# FROM dev_user
Copy the code
First, some methods for getting data from query, all(), first(), one(), Scalar (), get(), one_or_none()
-
All: Returns all data
-
First: Take the first row in all rows, or None if there is no result. No exception is thrown
-
One: The returned rows must have only one row. Exceptions will be thrown if there are multiple rows or no data
-
One_or_none: The returned rows must be one row or have no data. Exceptions will be thrown if there are more than one row of data
-
Scalar: The returned Rows must be one or no data, with multiple rows will throw an exception, and Scalar will only take the first result set in Result
=, <, > query
# from sqlalchemy import true
db.session.query(User.id, User.username).filter(User.is_activate==db.true())
SQL > select * from user;
db.session.query(User.username, UserInfo.company).filter(User.id==103, User.username=='Zachary Moreno').all()
# Combined queries can also use and_, which translates to the same SQL
from sqlalchemy import and_, or_
db.session.query(User.username, UserInfo.company).filter(and_(User.id==103, User.username=='Zachary Moreno')).all()
SQL > translate into
# SELECT dev_user.username AS dev_user_username, user_info.company AS user_info_company
# FROM dev_user, user_info
# WHERE dev_user.id = %(id_1)s AND dev_user.username = %(username_1)s
# params: {'id_1': 103, 'username_1': 'Zachary Moreno'}
db.session.query(User).filter(User.id < 20).all()
db.session.query(User).filter(User.create_at > '2020-02-02 00:00:00')
Copy the code
Fuzzy query
db.session.query(User.username).filter(User.email.startswith('123')).all()
db.session.query(User.username).filter(User.email.like('123%'))
# SELECT dev_user.username from dev_user where dev_user.email like '123%'
db.session.query(User.username).filter(User.email.endswith('123')).all()
Copy the code
join
Join joins tables
db.session.query(Comment.comment.label('user_comment'), User.id.label('user_id'), User.username.label('username')).join(User, User.id==Comment.user_id).filter(User.id<100).order_by(User.id.desc()).all() # desc is the method of the field
# SQL:
# SELECT comment.comment AS user_comment, dev_user.id AS user_id, dev_user.username AS username
# FROM comment INNER JOIN dev_user ON dev_user.id = comment.user_id
# WHERE dev_user.id = %(id_1)s
# params: {'id_1': 100}
Copy the code
You can use the alias of SQLAlchemy for multiple joins
from sqlalchemy.orm import aliased
Comment_alias = aliased(Comment, name='comment_alias')
db.session.query(User.id, User.username, User.email, User.phone, Comment.comment).join(Comment, Comment.user_id == User.id).join(Comment_alias, Comment_alias.user_id == User.id).all()
# SQL:
# SELECT dev_user.id AS dev_user_id, dev_user.username AS dev_user_username, dev_user.email AS dev_user_email, dev_user.phone AS dev_user_phone, comment.comment
# FROM dev_user INNER JOIN comment ON comment.user_id = dev_user.id INNER JOIN comment AS comment_alias ON comment_alias.user_id = dev_user.id
Copy the code
offset/limit
from sqlalchemy import true
db.session.query(User.username, User.phone, User.email).filter(User.is_active == true()).order(User.age.desc()).offset(10).limit(20)
# SQL:
# SELECT dev_user.username AS dev_user_username, dev_user.phone AS dev_user_phone, dev_user.email AS dev_user_email
# FROM dev_user
# WHERE dev_user.is_active = true ORDER BY dev_user.age DESC
# LIMIT %(param_1)s, %(param_2)s
# {'param_1': 10, 'param_2': 20}
Copy the code
in/not in/is null/is not null
# in
db.session.query(User).filter(User.id.in_([100.103.105]))
# not in
db.session.query(User).filter(~User.id.in_([100.103.105]))
# is null
from sqlalchemy import null
db.session.query(User).filter(User.email == null())
# is not nulldb.session.query(User).filter(User.email ! = null())Copy the code
select_from
Select * from select_FROM; select * from query; select * from join
db.session.query(User).select_from(Comment).join(User, Comment.user_id == User.id).all()
Get only the data in User, that is, the model in query
# SELECT dev_user.id AS dev_user_id, dev_user.username AS dev_user_username, dev_user.email AS dev_user_email, dev_user.is_active AS dev_user_is_active, dev_user.phone AS dev_user_phone, dev_user.age AS dev_user_age
# FROM comment INNER JOIN dev_user ON dev_user.id = comment.user_id
Copy the code