Gino ORM uses pre-experience instructions that make it harder to say what you feel

Before the actual business logic process, I have always been used to use PeeWee for relevant data processing of our database. But when it comes to using our asynchronous features, Peewee doesn’t do it very well! Compare it is synchronous mechanism! Although there is also support for peewee_async corresponding asynchronous libraries! But out of curiosity, I want to practice other asynchronous ORM uses!

There are several ORM implementation methods for asyncio database:

  • Gino (this library to use a little bit of pain, mainly their own look at the source code to see dizzy, not enough! Not understanding enough!)

  • Tortoise – ORm: tortoise-orm: tortoise-orm: tortoise-orm: tortoise-orm: tortoise-orm: tortoise-orm

  • Sqlalchemy version 1.4 added in sqlalchemy. Ext asyncio. AsyncSession (www.osgeo.cn/sqlalchemy/)…

  • Peewee_async third-party peewee asynchronous implementation (peewee – async. Readthedocs. IO/en/latest /)

  • Sqlalchemy sync library + Ormar (github.com/collerek/or…)

In practice:

  • Tortoise – Orm: Tortoise -orm: Tortoise -orm: tortoise-orm: Tortoise -orm: Tortoise -orm: Tortoise -orm: Tortoise -orm: Tortoise Something is still wrong! For example, index=True is the primary key

  • Gino is a major domestic implementation, based on SQLAlchemy, the advantage is that you can directly generate the corresponding model from the table. If it is a simple single table query, it is a good drop! But if it involves multiple table operations, and is not associated with the foreign key operation table, it may be relatively complicated to use a little bit, mainly the official website of the document example is not enough, I also entered the source code simply read, but look up is still a bit relatively complex! Here are some practical notes I took to get started with Gino.

Gino ORM Practice:

Because you already have tables in your business production environment, you now need to generate schemas based on tables (rather than writing one model at a time). So you can use SQLAlchemy to generate the model artifact to generate the corresponding model from the table.Copy the code

1. Generate models from data tables:

The result of the table is as follows:

Sqlacodegen - noconstraints outfile. / sysmodels. Py postgres: / / postgres: 123456 @127.0.0.1:5432 / zyxadminsystemCopy the code

2. Modify the generated model so that it fits

2.1 Define the base model classes

import asyncpg from gino.dialects.asyncpg import Pool as BasePool from gino import Gino from typing import List class Pool(BasePool): """ Pool allow to provide multiple postgresql hosts, since BasePool couldn't do that because it sets hosts implicitly """ async def _init(self): args = self._kwargs.copy() args.update( loop=self._loop, ) dsn = str(self._url) if dsn.startswith('postgresql+asyncpg') or dsn.startswith('postgres+asyncpg'): dsn = dsn.replace('+asyncpg', '', 1) self._pool = await asyncpg.create_pool(dsn, **args) return self db = Gino() class Base(db.model): pass # self.to_dict() def __init__(self, **kw): super().__init__(**kw) self._children = set() @property def children(self): return self._children @children.setter def add_child(self, child): Self._children. Add (child) # def single_to_dict(self): return {c.name: getattr(self, c.name) for c in self.__table__.columns} def cust_to_dict(self): return {c.name: getattr(self, c.name, None) for c in self.__table__.columns} def to_dict_exclude_field(self, fields: List[str] = None): if fields: return {key: For key, val in self.to_dict().items() if key not in fields} return self.to_dict() # def dobule_to_dict(self): result = {} for key in self.__mapper__.c.keys(): if getattr(self, key) is not None: result[key] = str(getattr(self, key)) else: Result [key] = getattr(self, key) return result # Function used with multiple objects -- def dobule_to_json(self, all_vendors): V = [ven.dobule_to_dict() for ven in all_vendors] return v # Transform @staticMethod async def when using functions used with multiple objects - result sets in the form of lists dobules_to_json_forsinge(all_vendors): v = [ven.to_dict() for ven in all_vendors] return v # @classmethod # def children_handle(cls,model,exclude_field=None): # return [ model.to_dict_exclude_field(exclude_field) for ch in cls.children][0] # _result_user_depart = [ children_handle(item) for item in parents] @classmethod async def get_one_or_none(cls, where): return await cls.query.where(where).gino.one_or_none() @classmethod async def get_all(cls, where): return await cls.query.where(where).gino.all() @classmethod async def store_count(cls): # test failed return await db.func.count(cls.id).gino.scalar() Column = db.Column Integer = db.Integer Text = db.text String = db.String DateTime = db.DateTime Date = db.DateCopy the code

2.2 Implement corresponding model and table mapping

from apps.plugs.admin.database import * from sqlalchemy import * from sqlalchemy.orm import relationship # import logging # logging.getLogger("gino").setLevel(logging.INFO) import logging # logging.basicConfig(level=logging.INFO) class ModelBase(Base): Create_time = Column(DateTime, comment=' create_time ') # update_time = db.Column(db.date, Update = Column(DateTime, comment=' createtime ') class SysDepart(ModelBase): __tablename__ = 'sys_depart' __table_args__ = {'comment': } ID = Column(Integer, primary_key=True, Autoincrement =True, comment=' primary key ID') parent_id = Column(Integer, Index =True, comment=' parent ID') depart_name = Column(Text, comment=' current department/department name ') sort_no = Column(Integer, Description = Column(Text, comment=' description ') org_category = Column(Text, comment=' Org_code = Column(Text, index=True,) org_code = Column(Text, index=True, Remark = Column(Text, comment=' information ') status = Column(Integer, index=True, Comment =' status (1- normal,2- frozen)') mobile = Column(Text, comment=' fax ') fax = Column(Text, comment=' fax ') address = Column(Text, Del_flag = Column(Integer, index=True, comment=' delete status (0- normal,1- deleted)') Create_by = Column(Text, default='now()', comment=' founder ') update_by = Column(Text, Comment =' update person ') class SysDepartPermission(ModelBase): __tablename__ = 'sys_depart_permission' __table_args__ = {'comment': } id = Column(Integer, primary_key=True, comment=' primary key id ') depart_id = Column(Integer, index=True, default="1", Comment =' id') permission_id = Column(Integer, index=True, comment=' id') datA_ruLE_ids = Column(Text, Comment =' data rule id') (omit other models -······)Copy the code

2.3 Basic Operations

2.3.1 Model operation

These operations are mainly linked table query, single table multi-condition query, paging query and so on

  • A multi-condition query for a single table and converts the result to a dictionary
import asyncio import traceback if __name__ == '__main__': async def main(): import gino try: await db.set_bind( 'postgres: / / postgres: 123456 @127.0.0.1:5432 / zyxadminsystem', SSL = None, statement_cache_size = 0, pool_class = Pool, Obj = await sysuser.query. Where (and_(sysuser.id == 1, Sysuser.realname == 'superadmin ')).gino.one_or_none() print(obj.to_dict()) # except Exception: traceback.print_exc() asyncio.get_event_loop().run_until_complete(main())Copy the code

The output is:

{'del_flag': 0, 'password': '1856555555', 'third_type': '', 'phone': '11 ', 'telephone': '', 'email': '[email protected]', 'client_id': '', 'create_by': '', 'sex': -5, 'org_code': 'A01A01', 'birthday': Datetime. Datetime (2020, 12, 16, 16, 0), 'third_id': '', 'username': 'administrator 22222',' realName ': 'super administrator ', 'description': ', 'del_time: None,' update_by ':' superadmin ', 'post' : 'general manager', 'salt', '123465', 'work_no' : '123456', 'status' : 1, 'depart_ids': ', 'position_name': 'R&D manager ', 'create_time': datetime. Datetime (2020, 12, 18, 17, 50, 17), 'avatar': xxxxxxx', 'update_time': None, 'user_identity': 1, 'id': 1, 'rel_tenant_ids': ''}Copy the code
  • Even the table query
results = await SysUserDepart.join(SysDepart, SysUserDepart.dep_id == SysDepart.id).select().where((SysDepart.del_flag == 0) & (SysUserDepart.dep_id == 1)).order_by(SysUserDepart.user_id).gino.all()
print(results)
for i in results:
    print(i.dep_id)
Copy the code
  • The subquery
j = join(SysUserDepart, SysUser, SysUserDepart.user_id == SysUser.id)
results = await SysUserDepart.query.where(SysUser.id==1).select_from(j).order_by(SysUserDepart.user_id).gino.all()

Copy the code
  • A simple query
Query. Where (sysuserrole-user_id == 1).gino.all() results = await Sysuser. get_all(sysuser. id == 1) # return a value where = and_(sysuser. id == 1, Sysuser.username == 'superadmin') obj = await sysuser.get_one_or_none (where) # return all and sort results = await SysUser.query.where(SysUser.id == 1).order_by(SysUser.id).gino.all()Copy the code
  • Fuzzy query
 obj = await SysUser.query.where(and_(SysUser.id == 1, SysUser.username.contains('super22'))).gino.one_or_none()        
Copy the code
  • Query a field of the table
# 1
obj = await SysUser.select('username').gino.first()
  
# 2
cols = [c.name for c in SysUserDepart.__table__.c if c.name != 'large_field']
print(cols)
cards = await SysUserDepart.select(*cols).where(SysUserDepart.id == 1).gino.all()
Copy the code

PS Select a field – returns a meta-ancestor, cannot use to_dict

  • Complex linked tables
_result_user_depart = await SysUserDepart \
.join(SysDepart, SysUserDepart.dep_id == SysDepart.id) \
.select() \
.where((SysDepart.del_flag == 0) & (SysUserDepart.user_id == 1)) \
.order_by(SysUserDepart.user_id).gino.all(return_mode=True)
Copy the code
  • Number of Query records
_result_user_depart = await sysuserstudy.store_count () #  setattr(SysUser, 'count', func.count(SysUser.id).label('count')) print(await SysUser.select('count').where(SysUser.id==1).gino.first()) SQL > select * from table_name;  query = SysUserDepart.outerjoin(SysDepart, SysUserDepart.dep_id == SysDepart.id).select().where((SysDepart.del_flag == 0) & (SysUserDepart.user_id == 1)).order_by(SysUserDepart.user_id)Copy the code
  • The db – select to use
query = db.select([SysUserDepart])
rows = await query.gino.all()
Copy the code
  • Use of table loader model transformations
###========== from gino.loader import ModelLoader query = db.select([SysUserDepart]) query = query.execution_options(loader=ModelLoader(SysUserDepart)) users = await query.gino.all() ###========== query = db.select([SysUserDepart]) query = query.execution_options(loader=SysUserDepart) users = await query.gino.all() ###========== query = db.select([SysUserDepart]) users = await query.gino.load(SysUserDepart).all() ##======== query = SysUserDepart.join(SysDepart, SysUserDepart.dep_id == SysDepart.id).select().where((SysDepart.del_flag == 0) & (SysUserDepart.user_id == 1)).order_by(SysUserDepart.user_id) parents = await query.gino.load(SysUserDepart.distinct(SysUserDepart.id).load(add_child=SysDepart)).all() print('parents', parents) for isd in parents: print(dir(isd)) print(isd.children) for ch in isd.children: Print (" or exchange is ", ch. Depart_name) ##======== print(" Select a field query - returns a primitive using the loader: to_dict") query = db.select([SysUser.username, Sysuser.phone]) users = await query.gino.load(SysUser). All () print(" select * from users.users ", users) for isdh in users: sysuser.phone]) users = await query.gino.load(SysUser). print(isdh.username) print(isdh.phone)Copy the code
  • Analysis of SQL:
Print (' analyze ', await db.scalar(db.exists(). Where (sysuser.id == 1).select()))Copy the code

2.3.2 execute SQL

query = db.text('SELECT * FROM sys_user WHERE id = :id_val')
row = await db.first(query, id_val=1)
print("SSSSSSSSSSSSSSSSS",row)
Copy the code

In the further process of pagination of linked table operation, for example, if I simply want to query some fields in a table and need to query linked table, and relevant data need to be paginated, the simple data pagination effect is shown in the figure:

I’m having a hard time using this Gino! Maybe I don’t know how to continue using which loader!! So much so that our use of -db-select is already feeling laborious!

Gino is also based on AsyncpG, so I feel that I can understand the scope of ORM more easily.

So try a little practice encapsulation:

2. Self-made simple asynchronous ORM based on Asyncpg

The simple structure is:

To put it bluntly, is actually how to carry out link call type, the SQL statement to assemble!

2.1 SQL encapsulation

Where models.base: main function drop pair, common type

  • Conditional statements
  • The query
  • Limit statement
  • Grouping statements
  • Sorting statements
  • Query or insert return statement
  • Join table query statement

And so on a series of operations for unified packaging.

2.1.1 Base. py Common Query base Criteria Encapsulation:

from dataclasses import dataclass

@dataclass
class Base:
    pass
    # 选择表的名称
    select_table = ''
    __where__ = []
    #  # 检索的字段
    __select_fields__ = []
    __sql__ = None
    __lock__ = None  # lock
    __groupby__ = []  # 排序字段
    __orderby__ = []  # 排序字段
    __join__ = []  # leftjoin
    __join_on_exp__ = []  # JOIN表达式的处理
    # 查询全局的总数QUA
    __count__ = []
    # PostgreSQL returning查询返回
    __returning__ = []

    __offset__ = 0
    __limit__ =0
    __ispaginate_by__= False

    # 常见的一些表达式关键词
    operators = [
        '=', '<', '>', '<=', '>=', '<>', '!=',
        'like', 'like binary', 'not like', 'between', 'not between',
        '&', '|', '^', '<<', '>>',
        'rlike', 'regexp', 'not regexp',
        '~', '~*', '!~', '!~*', 'similar to',
        'not similar to', 'not ilike', '~~*', '!~~*', 'in', 'not in'
    ]

    # 设置要查询的表名称
    def tablle(self, select_table):
        self.select_table = select_table
        # 重置设置相关信息
        self.__reset__()
        return self

    def __reset__(self):
        # 重置所有的表达式
        self.__where__.clear()
        self.__select_fields__.clear()
        self.__groupby__.clear()  # 排序字段
        self.__orderby__.clear()  # 排序字段
        self.__join__.clear()  # oin
        self.__join_on_exp__.clear()  # join
        self.__sql__ = None
        self.__returning__.clear()
        # 限制数
        self.__limit__ = 0
        # 偏移量
        self.__offset__ = 0
        # 乐观锁
        self.__lock__ = None

        __ispaginate_by__ = False


    def _format_columns(self, columns):
        return list(map(lambda index: index, columns))

    def format_column(self, columns=None):
        return '`{}`'.format(columns)

    def format_string(self, columns):
        return "'{}'".format(columns)

    def _compile_dict(self, data):
        return ['{}={}'.format(index, self.format_string(value)) for index, value in data.items()]

    def _compile_lock(self):
        return '' if not self.__lock__ else self.__lock__

    # 乐观锁
    def lock_by_for_update(self):
        self.__lock__ = ' for update'
        return self

    def sql(self):
        return self.__sql__

    def list_to_str(self, data):
        if data and isinstance(data, list):
            # 把列表转为字符串形式
            return '({})'.format(data[0]) if len(data) == 1 else tuple(data).__str__()
        if data and isinstance(data, tuple):
            return data.__str__()
        else:
            raise Exception('参数异常错误!')

    def list_to_str_no_format(self, data):
        '''
        列表转化为字符串格式
        :param data:
        :return:
        '''
        if data and isinstance(data, list):
            return '{}'.format(data[0]) if len(data) == 1 else tuple(data).__str__()
        if data and isinstance(data, tuple):
            return data.__str__()
        else:
            raise Exception('参数异常错误!')

    def _compile_where(self):
        '''
        开始组合为表达式类型
        :return:
        '''
        # 如果存在表达式值的话
        if len(self.__where__) > 0:
            sqlstr = []
            # 遍历所有的表达式的元素
            for index in self.__where__:
                # 如果表达式里面是一个字典类型的话
                if isinstance(index, dict):
                    # 使用and的符号进行串联起来
                    sqlstr.append(' and '.join(self._compile_dict(index)))
                # 如果是元组类型的话
                elif isinstance(index, tuple):
                    sqlstr.append(self._compile_tuple(index))
                elif isinstance(index, str):
                    sqlstr.append(index)
            return ' where {}'.format(' and '.join(sqlstr))
        return ''

    def _compile_groupby(self):
        # 如果存在分组信息则添加具体的分组字段信息
        return '' if len(self.__groupby__) == 0 else ' group by ' + ','.join(self.__groupby__)

    def _compile_orderby(self):
        # 串联排序字段
        return '' if len(self.__orderby__) == 0 else ' order by ' + ','.join(self.__orderby__)

    def offset(self, number:int):

        if self.__limit__ < 0:
            raise Exception('Limit 数不能小于等于0')

        if not isinstance(number,int):
            raise Exception('offset 必须是整型类型!')

        if number <0:
            raise Exception('offset 限制数不能小于0')

        self.__offset__ = int(number)
        return self

    def limit(self, number: int):
        if not isinstance(number, int):
            raise Exception('offset 必须是整型类型!')
        if number <0:
            raise Exception('Limit 限制数不能小于0')
        self.__limit__ = int(number)
        return self

    def paginate(self, page_no: int,page_size:int):
        self.__ispaginate_by__ = True
        self.limit(page_size)
        self.offset((page_no-1)*page_size)
        return self

    def _compile_limit_offset(self):
        pass
        # 不调用分页的场景情况下
        if not self.__ispaginate_by__:
            __offset__sql ='' if not self.__offset__ else ' offset {}'.format(self.__offset__)
        else:
            # 调用了分页函数场景下
            __offset__sql =' offset {}'.format(self.__offset__)

        return '' if not self.__limit__ else ' limit {}{}'.format(self.__limit__,__offset__sql)


    # 左连表的查询
    def _compile_leftjoin(self):
        if self.__join__:
            return ' ' + ' '.join([v for v in self.__join__])
        return ''

    def _compile_tuple(self, data):
        # 对in类的 where条件的做特殊的进行处理
        if data[1] in ['in', 'not in']:
            # data[2]的列表类型转为---把列表转为字符串
            return '{} {} {}'.format(data[0], data[1], self.list_to_str(data[2]))
        # 对between的表达式进行特殊的出来
        elif data[1] in ['between', 'not between']:
            # 如果最后的一个参数 存在两个的情况下
            if not (len(data) == 3 and len(data[2]) == 2):
                raise Exception('between表达式错误,需要有三个参数,且最后的一个参数值存在两个值')
            return '{} {} {} and {}'.format(data[0], data[1], self.format_string(data[2][0]), self.format_string(data[2][1]))

        return '{} {} {}'.format(data[0], data[1], self.format_string(data[2]))

    def groupby(self, *args):
        # 批量转换进行列表转
        self.__groupby__ = self._format_columns(list(args))
        return self

    def returning(self, column):
        # 批量转换进行列表转
        if isinstance(column, list):
            self.__returning__ = column
        else:
            self.__returning__.clear()
            self.__returning__.append(column)
        return self

    def orderby(self, column, direction='asc'):
        if direction.lower() == 'asc':
            self.__orderby__.append('{} {}'.format(column, direction))
        else:
            self.__orderby__.append('{} {}'.format(column, 'desc'))
        return self

        # 左连表的查询

    def _compile_returning(self):
        if self.__returning__:
            return 'returning {}'.format(','.join([v for v in self.__returning__]))
        return ''

    def where(self, *args):
        length = args.__len__()
        # 如果传入的查询条件是一个字典类型的,那么就转化为==
        if length == 1 and isinstance(args[0], dict):
            self.__where__.append(args[0])
        # 如果传入的是多个参数那么应该是一个元组类型的
        elif length == 2:
            self.__where__.append({args[0]: args[1]})
        # 如果传入的是一个表达式类型的,就三个的参数
        # 处理判断第二个的参数是否再对应的表达式
        elif length == 3:
            if args[1] in self.operators:
                # 如果是等号的类型
                if args[1] == '=':
                    self.__where__.append({args[0]: args[2]})
                else:
                    # 如果是其他的表达式
                    self.__where__.append((args[0], args[1], args[2]))
            else:
                raise Exception('表达式不在预定义范围内: "{}"'.format(args[1]))
        elif length == 1 and isinstance(args[0], str):
            self.__where__.append(args[0])
        else:
            raise Exception('错误的参数表达式类型')
        return self


    def join_on_exp(self, *args):

        # __join_on_exp__ 串联起来的的格式是使用 三个元祖对象进行串联
        length = args[0].__len__()
        if isinstance(args[0], set):
            raise Exception('__join_on_exp__表达式不知此set类型传入: ')
        elif isinstance(args[0], list):
            # 多条件的类型的串联
            for item in args[0]:
                self.__join_on_exp__.append(item)
            pass
        elif isinstance(args[0], dict):
            for key, value in args[0].items():
                self.__join_on_exp__.append((key, '=', value))
        # 如果传入的查询条件是一个字典类型的,那么就转化为==
        elif length == 1 and isinstance(args[0][0], str):
            # 需要组成三个对象的元祖类型
            self.__join_on_exp__.append((args[0][0], '', ''))
        # 如果传入的是多个参数那么应该是一个元组类型的
        elif length == 1 and isinstance(args[0][0], dict):
            # 多个字典类型的=号的串联
            for key, value in args[0][0].items():
                self.__join_on_exp__.append((key, '=', value))
        # 如果传入的是多个参数那么应该是一个元组类型的
        elif length == 2:
            # 如果里面包含有占位符的标记的话,可以串联起两个的表达式类型
            if '?' in args[0][0]:
                # 需要组成三个对象的元祖类型
                self.__join_on_exp__.append(("{}".format(args[0][0].replace('?', args[0][1])), '', ''))
            else:
                # 没有任何的占位符的默认是= 串联起来
                self.__join_on_exp__.append((args[0][0], '=', args[0][1]))
        # 如果传入的是一个表达式类型的,就三个的参数
        # 处理判断第二个的参数是否再对应的表达式
        elif length == 3:
            if args[0][1] in self.operators:
                # 如果是等号的类型
                if args[0][1] == '=':
                    self.__join_on_exp__.append((args[0][0], '=', args[0][2]))
                else:
                    # 如果是其他的表达式
                    self.__join_on_exp__.append((args[0][0], args[0][1], args[0][2]))
            else:
                raise Exception('__join_on_exp__表达式不在预定义范围内: "{}"'.format(args[0][1]))
        else:
            raise Exception('__join_on_exp__错误的参数表达式类型')
        return self

    def _compile_on(self):

        # if not self.__join_on_exp__:
        #     raise Exception('连表查询缺少__join_on_exp__参数表达式错误!')
        sqlstr = ['{} {} {}'.format(index[0], index[1], index[2]) for index in self.__join_on_exp__]

        return ' and '.join(sqlstr)

    def leftjoin_str(self, join_tablename, on: str):
        # if not  self.__join_on_exp__:
        #     raise Exception('__join_on_exp__缺少必要的JOIN ON表达式!')
        # ON表示的多个条件的处理
        self.__join__.append('left join {} on {}'.format(join_tablename, on))

        return self

    def innerjoin_tuple(self, join_tablename, on):
        self.join_on_exp(on)
        if not self.__join_on_exp__:
            raise Exception('连表查询缺少__join_on_exp__参数表达式错误!')
        self.__join__.append('inner join {} on {}'.format(join_tablename, self._compile_on()))
        self.__join_on_exp__.clear()  # join
        return self

    def leftjoin_tuple(self, join_tablename, on):
        self.join_on_exp(on)
        if not self.__join_on_exp__:
            raise Exception('连表查询缺少__join_on_exp__参数表达式错误!')
        self.__join__.append('left join {} on {}'.format(join_tablename, self._compile_on()))
        self.__join_on_exp__.clear()  # join
        return self

    def rightjoin_tuple(self, join_tablename, on):
        self.join_on_exp(on)
        if not self.__join_on_exp__:
            raise Exception('连表查询缺少__join_on_exp__参数表达式错误!')
        self.__join__.append('right join {} on {}'.format(join_tablename, self._compile_on()))
        self.__join_on_exp__.clear()  # join
        return self

    def full_outer_join_tuple(self, join_tablename, on):
        self.join_on_exp(on)
        if not self.__join_on_exp__:
            raise Exception('连表查询缺少__join_on_exp__参数表达式错误!')
        self.__join__.append('full outer join {} on {}'.format(join_tablename, self._compile_on()))
        self.__join_on_exp__.clear()  # join
        return self

Copy the code

2.1.2 Select.py Selectors encapsulation:

from dataclasses import dataclass from apps.ext.ormx.models.base import Base from apps.ext.ormx.exceptions import ClientConfigBadException from typing import List, Union, Tuple @dataclass class Select(Base): Def select(self, *args): "" supports strings, tuples, and lists of arguments passed in :param args: :return: If len(args) == 1 and isinstance(args[0], STR): if ',' in args[0]: for v in args[0].split(','): self.__select_fields__.append(v.strip()) else: Self. __select_fields__. Append (args[0]) elif (args[0]) == 1 and isinstance(args[0], List): for v in args[0]: self.__select_fields__.append(v.strip() if isinstance(v, str) else v) elif len(args) > 1 and isinstance(args, Tuple): for v in args: self.__select_fields__.append(v.strip() if isinstance(v, str) else v) print(self.__select_fields__) return self def do_select(self,isback___sql__=False): If not self. Select_table: raise ClientConfigBadException(errmsg=' select_fields__ ') if not self. Self.__select_fields__. Append ('*') # Join ([self._compile_where(), self._compile_whereor(), self._compile_orWHERE (), self._compile_groupby(), self._compile_orderby(),self._compile_having(), self._compile_offset(), self._compile_lock()]) subsql = ''.join([self._compile_where(),self._compile_groupby(), self._compile_orderby(),self._compile_lock(),self._compile_limit_offset()]) joinsql = ''.join(self._compile_leftjoin()) If self.__returning__: raise ClientConfigBadException(errmsg=' SELECT NOT allowed to call RETURNING function! Please test! ') self.__sql__ = 'select {} from {}{}{}'.format(','.join(self.__select_fields__), self.select_table,joinsql,subsql) # returnsql = "select {} from {}{}{}".format(self._compile_limit(), ','.join(self.__select__), self._tablename(), joinsql, subsql) if isback___sql__: return self.__sql__ return selfCopy the code

2.1.3 Update. py Encapsulates the updater:

from dataclasses import dataclass from apps.ext.ormx.models.base import Base from apps.ext.ormx.exceptions import ClientConfigBadException import collections @dataclass class Update(Base): # retirement (self, key, amount=1): if isinstance(amount, int) and amount > 0: data = collections.defaultdict(dict) data[key] = '{}-{}'.format(str(key), STR (amount)) self.__sql__ = self. _increment(data) return self # increment def increment(self, key, amount=1): if isinstance(amount, int) and amount > 0: data = collections.defaultdict(dict) data[key] = '{}+{}'.format(str(key), STR (amount)) self.__sql__ = self._increment (data) return self # def _compile_update(self, data): if not self.select_table: Raise ClientConfigBadException(errmsg=' ') print(" ",self.__returning__) return "update {} set {}{}{}".format(self.select_table, ','.join(self._compile_dict(data)), self._compile_where(),self._compile_returning()) def _compile_increment(self, data): if not self.select_table: Subsql =','. Join (['{}={}'. Format (STR (index), value) for index, value in data.items()]) return "update {} set {}{}{}".format(self.select_table, subsql, Def update(self, data): if data and isinstance(data, dict): self._compile_WHERE (),self._compile_returning()) data = {key: value for key, value in data.items()} self.__sql__ = self._compile_update(data) return selfCopy the code

2.1.4 Insert.

from dataclasses import dataclass from apps.ext.ormx.models.base import Base from apps.ext.ormx.exceptions import ClientConfigBadException import collections @dataclass class Update(Base): # retirement (self, key, amount=1): if isinstance(amount, int) and amount > 0: data = collections.defaultdict(dict) data[key] = '{}-{}'.format(str(key), STR (amount)) self.__sql__ = self. _increment(data) return self # increment def increment(self, key, amount=1): if isinstance(amount, int) and amount > 0: data = collections.defaultdict(dict) data[key] = '{}+{}'.format(str(key), STR (amount)) self.__sql__ = self._increment (data) return self # def _compile_update(self, data): if not self.select_table: Raise ClientConfigBadException(errmsg=' ') print(" ",self.__returning__) return "update {} set {}{}{}".format(self.select_table, ','.join(self._compile_dict(data)), self._compile_where(),self._compile_returning()) def _compile_increment(self, data): if not self.select_table: Subsql =','. Join (['{}={}'. Format (STR (index), value) for index, value in data.items()]) return "update {} set {}{}{}".format(self.select_table, subsql, Def update(self, data): if data and isinstance(data, dict): self._compile_WHERE (),self._compile_returning()) data = {key: value for key, value in data.items()} self.__sql__ = self._compile_update(data) return selfCopy the code

2.1.5 delete.py Encapsulation of the deleter:

from dataclasses import dataclass from apps.ext.ormx.models.base import Base from apps.ext.ormx.exceptions import ClientConfigBadException @dataclass class Delect(Base): pass def __columnize(self, columns): Convert dictionary resources to meta-primitives, and the meta-primitives' content elements are wrapped with ". return tuple(columns).__str__().replace('\'', '"') def __valueize(self, data): Print ("data", data) # print("data", data) Return ','. Join ([tuple(index.values()).__str__() for index in data]) def delete(self): self.__sql__ = self._compile_delete() return self def _compile_delete(self): if not self.select_table: Return 'delete from {}{}{}'. Format (self.select_table, self._compile_where(),self._compile_returning())Copy the code

2.1.6 Core.py Database linker.py:

import asyncpg import traceback from apps.ext.ormx.exceptions import ClientConfigBadException from typing import Optional from dotmap import DotMap from apps.ext.ormx.models.select import Select from apps.ext.ormx.models.insert import Insert from apps.ext.ormx.models.update import Update from apps.ext.ormx.models.delete import Delect # DSN = 'postgres://{0}:{1}@{2}:{3}/{4}'.format(db_user, db_pass, db_host, db_port, db_name,) class ZyxOrm(Select,Insert,Delect,Update): def __init__(self, config): self.pool: Optional[asyncpg.pool] = None self.config = config # If not self.config: Raise ClientConfigBadException async def connect_pool(self, # STR, *, max_queries=50000, # Set the lifetime of a connection in the pool. If the value is set to 0, the lifetime is infinite. If it is not 0 and less than 30 seconds, it is reset back to 30 minutes. Max_inactive_connection_lifetime =300.0,): try: if self.config: Dict (row1.items())) self.pool = await asyncpg.create_pool(min_size=self.config.get('min_size', 5), max_size=self.config.get('max_size', 5), user=self.config.user, host=self.config.host, max_queries=max_queries, max_inactive_connection_lifetime=max_inactive_connection_lifetime, port=self.config.get('port', 5432), password=self.config.password, database=self.config.database) else: pass return self.pool except Exception: Traceback.print_exc () pass reobtains an available connection from the connection pool in the getCurosr method. Async def get_conn(self): conn = await self.pool.acquire() return conn # Async def close_conn(self, conn): Async def close_pool(self): if self.pool is not None: Await self.pool.close() raise ClientConfigBadException(errmsg=' Database is not connected! ') # execute and execute_many execute SQL statements without returning record values! Async def act_execute(self, *args, SQL: STR, timeout=None): async with self.pool.acquire() as conn: async def act_execute(self, *args, SQL: STR, timeout=None) Await conn.execute(SQL, *args, timeout=timeout) # execute and execute_many execute SQL statements without returning record values! Async def act_execute_many(self, *args, SQL: STR, timeout=None, isopen_TRANSACTION =False): async def act_execute_many(self, *args, SQL: STR, timeout=None,isopen_transaction=False) async with self.pool.acquire() as conn: if isopen_transaction: async with conn.transaction(): await conn.executemany(sql, *args, timeout=timeout) else: Async def fetch_all(self, *args, timeout=timeout) sql=None,*args, isdict=True,timeout=None,isopen_transaction=False): print("SQL:", self.__sql__) if not self.__sql__ and sql: self.__sql__ = sql async with self.pool.acquire() as conn: Call await conn.fetch to execute the models statement and obtain all the records that satisfy the condition. If isopen_TRANSACTION: async with conn.TRANSACTION (): rows = await conn.fetch(self.__sql__, *args, timeout=timeout) else: rows = await conn.fetch(self.__sql__, *args, timeout=timeout) # return list(map(dict, rows))if rows else None return (list(map(dict, Rows)) if isdict else rows) if rows else None # ---- Async def fetch_get(self, *args, isdict=True,timeout=None,isopen_transaction=False): print("SQL:", self.__sql__) if self.__sql__: async with self.pool.acquire() as conn: Fetchrow () {}} fetchrow () {} If isopen_TRANSACTION: async with conn.TRANSACTION (): row = await conn.fetchrow(self.__sql__, *args, timeout=timeout) else: row = await conn.fetchrow(self.__sql__, *args, Timeout =timeout) return (dict(row.items()) if isdict else row)if row else None # The following data is returned ---- This method cannot execute multiple SQL to avoid SQL injection async def raw_sql_fetch_all(self, sql,*args, isdict=True, timeout=None,isopen_transaction=False): # The connection is taken from the pool, Fetch_all (SQL = SQL,timeout=timeout, isopen_TRANSACTION =isopen_transaction) return await self.fetch_all(SQL = SQL,timeout=timeout,isopen_transaction= ISopen_transaction)Copy the code

2.2 Usage:

import asyncio async def main(): myorm = ZyxOrm(config=DotMap(min_size=5, max_size=10, user='postgres', host='localhost', port=5432, password="123456", database="zyxadminsystem")) await myorm.connect_pool() try: _restlt = await myorm \ .tablle('sys_user') \ .where('id', '=', 1) \ .where('username', 'like', 'super % %) \. Do_select () \. Fetch_get () print (" query - query all fields, Tablle ('sys_user') \.select('username') \. Where ('id', '=', 1) \. Where (' username 'and' like ', '% super %) \. Do_select () \. Fetch_get () print (" query - query a dictionary, Tablle ('sys_user') \. Select ('username') \. Where ('id', 'not in', [1, 2]) \.do_select() \.fetch_all() print(" where ") ", _restlt) _restlt = await myorm \ .tablle('sys_user_role') \ .select('sys_user_role.user_id') \ .select('sys_user.realname') \ .leftjoin_tuple('sys_user', on=('sys_user_role.user_id', '=', 3)) \ .leftjoin_tuple('sys_user_depart', on=('sys_user_depart.user_id', 'sys_user.id')) \ .groupby('sys_user_role.user_id', 'sys_user.realname') \ .orderby('sys_user.realname', 'desc') \.do_select() \.fetch_all() print ", _restlt) _restlt = await myorm \ .tablle('sys_user') \ .select('sys_user_role.user_id') \ .select('sys_user.*') \ .leftjoin_tuple(join_tablename='sys_user_role', on=('sys_user_role.user_id', '=', 3)) \ .leftjoin_tuple(join_tablename='sys_user_depart', on=[('sys_user_role.user_id', '=', 3),('sys_user_depart.user_id', '=', 'sys_user.id')]) \ .leftjoin_tuple(join_tablename='sys_user_tenant', On =('sys_user_tenant.name =?', 'sys_user.username') \.do_select() \.fetch_all() print(' Tablle ('sys_user') \. Select (' Max (id), Max (create_time)') \ The where (' username 'and' like ', '% admin %) \. Do_select () \. Fetch_get () print (' function of use!!!!! ', _restlt) susadsql = myorm \ .tablle('sys_user') \ .select('id') \ .where('create_time', '>=', '2020-07-08 09:12:37') \. Do_select (isback___SQL__ =True) print(' 2020-07-08 09:12:37') \. ', susadsql) _restlt = myorm \ .tablle('sys_user') \ .select('count(id)') \ .where('id in ({})'.format(susadsql)) \ Fetch_get () print(' subquery: - function used!! ', _restlt = MYorM. TABLLE (' sys_user_ROLE ') _restLT = MYorM. TABLLE ('sys_user_role') .create({'user_id':34,'role_id':324}) _restlt = myorm.tablle('sys_user_role').create([{'user_id': 34, 'role_id': 324},{'user_id': 134, 'role_id': }]) # print( # ', resiasda. __sql__) more than one value into the insert -- -- -- -- -- -- -- field list and value list _restlt = myorm. Tablle (' sys_user_role). Insert ([' user_id ', 'role_id], [[' 35 ', '4554'], [' 56 ', Resiasda = MYorm. tablLE ('sys_user_role'). Where ('id',11).delete() print(' delete ', Resiasda.__sql__) print(" update 》》》》》"*5) resiasda = MYorm. tablLE ('sys_user'). Where ('id', 1).returning(' dichotomy '). Update ({' dichotomy ': 'dichotomy', 'dichotomy ': __SQL__) resiASDA = MYorM. tablLE ('sys_user'). Where ('id', 1).decrement('sex', Print ('decrement', Tablle ('sys_user') \. Select ('username') \. Where ('id', '=', 1) \. Where (' username 'and' like ', '% administrator %) \. Lock_by_for_update () \. Limit (1) \. Do_select () \ print (' optimistic locking, Resiasda.sql ()) print("lmist and offset Settings!!" Tablle (' sys_USER ') \.lock_BY_FOR_UPDATE () \.limit(0) \.offset(2)\ .do_select() \ print('lmist and offset set ', Resiasda = myorm \.tablle('sys_user') \.paginate(1,1)\.do_select() print(' paging query ', Resiasda.sql ()) resiasda = await resiasda.fetch_all() print('paginate ') {'user_id':34,'role_id':324} except Exception: Traceback.print_exc () if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main())Copy the code

Conclusion:

Compared with other ORMS, the usage is basically the same. Later, we can combine our PyDantic to generate our pydantic corresponding table model according to SQL tables to do the verification of relevant fields and data, etc. (The model that generates pydantic correspondence table can be modified later.)

Using ORM versus non-ORM is a matter of opinion! Colleagues and business requirements and complexity also play a role, so it’s up to you to weigh them.

The above is just a personal combination of their own actual needs, do simple practice notes! If there are clerical errors! Welcome criticism and correction! Thank you!

At the end

Simple notes! For reference only!

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822