1: the opening

Test truth in practice Develop truth in practice

After finishing the whole process of learning Fastapi frame a few notes, found that their actually for some knowledge points to grasp to the practical application is easy to forget, and some knowledge is difficult to understand and use, stayed up the whole length of time, also is one of the complete internal background migrated from the flask Fastapi frame.

In the process of migration, I know that the use of Python asyncio still needs more practice, so as to deeply understand and apply the use of asyncio asynchronous IO, and I have accumulated some experience in the process of using this framework.

In order to test their knowledge level and systematic study and using the framework, also do not live up to what pay attention to my public number fans (although not much, but still grateful, after all the things I write feel too messy, too bad ~ haha) have said before practice, so anyway, Or the time to arrange their own scaffolding to build some knowledge points to complement the system.

2. Content summary

Related articles mainly describe some of our daily API construction process and some processes through actual combat. In fact, it is nothing more than three categories of development, test and deployment:

  • Construction of development environment (Docker environment construction)

  • API design some specification description

  • Organize the function points of scaffolding

    • Project configuration file processing
    • API logging processing
    • Asynchronous Redis cache processing
    • Synchronize database integration for use
    • Integration and use of asynchronous databases
    • Global error exception handling
    • Global Http request response packet processing
    • Extend third party plugins – current limiter
    • Extension of third party plug-ins – error statistics handling
    • Extensions – third party plug-ins – Global authentication JWT
    • Extensions – third party plug-ins – Message queue integration
    • API version planning and processing
    • Api-related unit testing and introduction of related performance analysis
    • If possible (with the K8S application (I’m still learning))
  • Then it is the deployment of a complete API service, which is mainly combined with Docker and Drone for relevant deployment

The content may change from time to time, but the overall picture may still revolve around the outline points above.

The above points are probably what I will sort out in my next article. Because of my limited ability of expression, I may not be able to express my ideas in some places. I hope you can excuse me and point out my criticism.

PS: Scaffolding is purely the accumulation of precipitation results of continuous optimization of personal experience, belonging to personal experience, for reference only! If have mistake, still ask everybody big guy to correct!

3. About scaffolding

3.1 the preface

First of all, after learning a series of basic knowledge before, how to apply this framework to our own business environment needs to be properly integrated and encapsulated to make it more convenient for our business opening.

How to package and integrate a scaffold that belongs to your own, this depends on your favorite is not the style of it, in addition, if the company also has the relevant specification requirements, then you can only design according to the specification requirements of the company.

But in general, I personally feel that a scaffold is inseparable from several elements, my own words, then encapsulate some commonly used several functions (in fact, is also mentioned above several points) :

  • Support Swagger interface document generation (FastAPI is natural)

  • Need to support the corresponding log collection – I use Loguru for logging here

  • The extension supports interface authentication validation for JWT

  • Supports cross-domain cORS support for related interfaces

  • Support for middleware extensions (but how fastAPI middleware extensions involve reading information such as body in Reques is problematic and needs to consider your own requirements)

  • If you like the way configuration file types are read, you can also parse configuration files

  • Global uniform response return body (whether error or normal)

  • Custom traceid link tracing for internal logs (including third party request processing)

  • Support rate interface traffic limiting (based on Redis)

  • Support global exception capture, can be related notification

  • Sentry exception capture upload processing

3.2 Overall structure description of scaffolding

Because if the project is for a single project, not a large project, can not extract the public module out, if there is a necessary public module to share the use of words, can extract the public out! I’m not going to extract it here.

The following is a structure of my own scaffolding as a whole:

PS: A purely personal way of organization, for reference only

3.3 Planning on scaffolding based case structure small example:

  • Use asynchronous mode to connect to three local interface (weather forecast interface)
  • Create a simple user management system based on Vue (using someone else’s Vue template is the fastest drop)
  • The long term plan is to create a backend system for data

2. Construction starts

The construction of a scaffold starts from the planning of our project structure (of course, it needs to be determined according to our own enterprise). The first is the screenshot below, which is the structure diagram planned by myself:

After planning the project structure, the next step is to start defining our own FastAPI object and initialize the base object with some configuration information to read.

In flask, we used to initialize our objects in factory mode, so we define them in that way:

From the above you can probably understand our entire application startup, which accessories need!

2.1 Description of Global Configuration Files

Since some configuration information is involved in the startup of an APP object, we need to manage it in a unified manner. In order to make it more detailed, I even divided the configuration functions more clearly by myself:

As the picture above shows, there are:

  • Configuration information about global authentication
  • Document Configuration information
  • Database configuration information
  • Redis configuration information

PS: Usually production environments, depending on personal needs, must read some important information by writing to environment variables for security purposes! This can be combined with reading environment variables to resolve the corresponding configuration information!

Dotenv is a library to learn about!

The following configuration files are available:

Auth_conf. Py:

#! / usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: Auth_conf file function description: the founder of functional description: let students creation time: 2021/6/9 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - 2021/6/9 modify description: ------------------------------------------------- """ from functools import lru_cache from pydantic import BaseSettings import pprint import secrets from typing import List pp = pprint.PrettyPrinter(indent=4) class JWT_ALGORITHM: STR = "HS256" # JWT_SECRET_KEY: STR = secret.token_urlsafe (32) # JWT_SECRET_KEY: STR = '09 d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7 # the validity of the token configuration JWT_ACCESS_TOKEN_EXPIRE_MINUTES: JWT_REFRESH_EXPIRES_DAYS: int = 1 # cross-domain Settings "ORIGINS" : List[str] = ["*"] class Config: env_file = ".env" case_sensitive = True env_file_encoding = 'utf-8' ADMIN_WHILE_ROUTE = [ # '/sys/user/logout', '/5gmsg/sys/user/login', '/nw/sys/user/login', '/', '/check', '/check23', '/jcg_admin/api/v1/login', '/websocket/1', '/openapi_url', '/nw/sys/user/login', '/nw/sys/user/loginceshi' ] @lru_cache() def get_auth_settings(): return AuthUrlSettings() auth = get_auth_settings()Copy the code

Docs_conf.py (used when initializing our Fastapi object) :

from functools import lru_cache from pydantic import BaseSettings import pprint pp = pprint.PrettyPrinter(indent=4) Class DocsSettings(BaseSettings): """ API_V1_STR: DOCS_URL = API_V1_STR + '/docs' REDOC_URL = API_V1_STR + '/docs' OPENAPI_URL = API_V1_STR + '/ OPENAPI_URL '# port description DESC = "" 'XXXXXX message management system background with' - front end: using ANT VBEN framework to build - back end: Synchronous multithreaded mode + single threaded mode coroutine mode - Technology stack: ** - [1] - [2] - [3] [message template module] """ TAGS_METADATA = [{"name": # {# "name": "XXXXXX message management module ", # "description": #}, # {# "name": "XXXXX ", # "description":" XXXXX ", # "externalDocs": {# "description" : "child document information," # "url" : "https://fastapi.tiangolo.com/", #}, # #}] configuration parameter information of the agent to the SERVERS = [{" url ": "/", "description" : "local debugging environment"}, {" url ":" https://xx.xx.com ", "description" : "online test environment"}, {" url ": "https://xx2.xx2.com", "description": "online production environment "},] @lru_cache() def get_settings(): Return DocsSettings() # docs = get_settings()Copy the code

Pgdb_conf. Py:

from functools import lru_cache from pydantic import BaseSettings import pprint pp = pprint.PrettyPrinter(indent=4) Class DatabaseSettings(BaseSettings): DEPLOY_HOST: STR = '0.0.0.0' DEPLOY_PORT: int = 8888 DEPLOY_DEBUG: bool = False DEPLOY_RELOAD: bool = False DEPLOY_ACCESS_LOG: bool = False @lru_cache() def get_settings(): Return DatabaseSettings() # pgconf = get_settings()Copy the code

Redis_conf. Py:

from functools import lru_cache from pydantic import BaseSettings import pprint from pydantic import AnyUrl, BaseSettings import os pp = pprint.PrettyPrinter(indent=4) class RedisSettings(BaseSettings): DEPLOY_HOST: STR = '0.0.0.0' DEPLOY_PORT: int = 8888 DEPLOY_DEBUG: bool = False DEPLOY_RELOAD: bool = False DEPLOY_ACCESS_LOG: Bool = False # redis://:[email protected]:6379/0? Encoding = UTF-8 redis_URL: AnyUrl = os.environ. Get (" redis_URL ", "redis://127.0.0.1:6379/0? encoding=utf-8") redis_password: str = os.getenv("REDIS_PASSWORD", "") redis_db: Int = int(os.getenv("REDIS_DB", "0")) # bool = ( True if os.getenv("REDIS_USE_SENTINEL", "0") == "1" else False ) redis_sentinel_port: int = int(os.getenv("REDIS_SENTINEL_PORT", "26379")) redis_sentinel_url: str = os.getenv("REDIS_SENTINEL_URL", "") redis_sentinel_password: str = os.getenv("REDIS_SENTINEL_PASSWORD", "") redis_sentinel_master_name: str = os.getenv( "REDIS_SENTINEL_MASTER_NAME", "molmaster" ) @lru_cache() def get_settings(): Redisconf = get_settings()Copy the code

The above is so about the configuration information:

2.2 App Example object creation

An example of Fastapi is an application service that needs to be configured when it is restarted. So there is the following plan:

The main contents are as follows:

When creating an APP, instantiate some of our plug-ins or imported API services, etc. The following is a detailed description of the method:

So our expansion will also be based on the above relevant registration function module expansion.

2.2.1 Processing the Registration log module

In an application, the log is an essential part of our subsequent exception location, so we need to write our relevant request log locally to facilitate the subsequent backtracking problem location.

Here log module processing, I use loguru, this library is actually quite good log library, also support asynchronous write, so again asynchronous, feel use should be very ideal.

Questions to consider before redefining the logging module:

  • Log storage directory
  • Logging format
  • Log Cutting

Based on the above problems, we introduced our logging as a plug-in.

So under our ext module, we have the corresponding log plug-in processing:

  • Define logging:

Loger_config. py file contents:

Import logger def from datetime import datetime from loguru import Logger def Creat_customize_log_loguru (pro_path=None): "" :param pro_path: specifies the path of the log file to be generated. Return: ''' import os if not pro_path: # BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) pro_path = Split (os.path. realPath (__file__))[0] # define info_log file name log_file_path = os.path.join(pro_path, 'log/info_{time:YYYYMMDD}.log') # define file name err_log_file_path = os.path.join(pro_path, 'log/error_{time:YYYYMMDD}.log') from sys import stdout LOGURU_FORMAT: str = '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: < 16} < / level > | < bold > {message} < / bold > '# is critical to avoid repeated this sentence to write our log logger. The configure (to handlers = {' sink' : stdout, 'the format: LOGURU_FORMAT}]) # this can also enable avoiding multiple writes, But our app: register_logger: 40 - unable to output # logger. The remove () # error log do not need to be compressed format = "{time: YYYY - MM - DD HH: MM: ss: SSS} | process_id:{process.id} process_name:{process.name} | thread_id:{thread.id} thread_name:{thread.name} | {level} |\n {message}" # enqueue=True # enqueue=True # enqueue=True # enqueue=True # enqueue=True # enqueue=True # enqueue=True  format=format, rotation='00:00', encoding='utf-8', level='ERROR', The enqueue = True) # Automatically rotate too big file # corresponding to different formats format2 = "{time: YYYY - MM - DD HH: MM: ss: SSS} | process_id:{process.id} process_name:{process.name} | thread_id:{thread.id} thread_name:{thread.name} | {level} | {message}" # enqueue=True "{message}" # enqueue=True" # enqueue=True "{message}" # enqueue=True format=format2, rotation='00:00', compression="zip", encoding='utf-8', level='INFO', enqueue=True) # Automatically rotate too big fileCopy the code

PS: Note that when using this log handler, it is very important to avoid multiple writes to our log. Without this, it will repeat multiple writes to our log.

logger.configure(handlers=[{‘sink’: stdout, ‘format’: LOGURU_FORMAT}])

  • Define logging objects:

Why do I need this kind of processing? I have mentioned in several previous articles about the problem of fastAPI middleware processing logs. If we use middleware to record logs, we cannot use request: Request twice, and my own logging requirements are as follows:

2021-08-03 10:21:14:718 | process_id:24088 process_name:MainProcess | thread_id:2684 thread_name:MainThread | INFO | {"traceid": "DKVNm2JsxA2wVhY2hvFSQa", "trace_index": 1, "event_type": "request", "msg": {"useragent": {"os": "Windows 10", "browser": "QQ Browser 10.8.4405", "Device ": {"family": "Other", "brand": null, "model": null}}," URL ": "/ check", "method" : "GET", "IP" : "127.0.0.1", "params" : {}, "ts" : "2021-08-03 10:21:14"}} 2021-08-03 10:21:14:719 | process_id:24088 process_name:MainProcess | thread_id:2684 thread_name:MainThread | INFO | {"traceid": "DKVNm2JsxA2wVhY2hvFSQa", "trace_index": 2, "event_type": "Response", "MSG" : {" status_code: "200," cost_time ":" 0.00 ", "RSP" : "ok", "ts" : "the 2021-08-03 10:21:14"}}Copy the code

Since my request log is kept separate from my response log, my approach is somewhat different from the one suggested by some big guy. ContextLogerRoute (ContextLogerRoute); ContextLogerRoute (ContextLogerRoute); ContextLogerRoute (ContextLogerRoute);

File Name:

contexr_logger_route.py
Copy the code

Contents of the document:

from time import perf_counter
from loguru import logger
from fastapi import APIRouter, FastAPI, Request, Response, Body
from fastapi.routing import APIRoute
from typing import Callable, List
from fastapi.responses import Response
from apps.utils import json_helper
import uuid
import shortuuid
from datetime import datetime
from user_agents import parse
from urllib.parse import parse_qs


# 因为Fastapi无法再中间二次消费请求的问题,只能通过自定义的路由的方式来进行日志的记录

class ContextLogerRoute(APIRoute):
    pass
    # 配置需要特殊记录的请求的头的值的信息
    nesss_access_heads_keys = []

    # 封装一下关于记录序号的日志记录用于全链路的日志请求的日志
    @staticmethod
    async def async_trace_add_log_record(request: Request, event_type='', msg={}, remarks=''):
        '''

        :param event_type: 日志记录事件描述
        :param msg: 日志记录信息字典
        :param remarks: 日志备注信息
        :return:
        '''

        # print("我当前的请求ID:",request.app.state.curr_request,id(request.app.state.curr_request))
        # print("我当前的请求ID:", request,id(request))
        #
        # print("我当前的请求ID:", request.app.state.curr_request.state.ssss)
        # print("我当前的请求ID:", request.state.ssss)



        # 如果没有这个标记的属性的,说明这个接口的不需要记录啦!
        if hasattr(request.state, 'traceid'):
            # 自增编号索引序
            trace_links_index = request.state.trace_links_index = getattr(request.state, 'trace_links_index') + 1
            log = {
                # 自定义一个新的参数复制到我们的请求上下文的对象中
                'traceid': getattr(request.state, 'traceid'),
                # 定义链路所以序号
                'trace_index': trace_links_index,
                # 时间类型描述描述
                'event_type': event_type,
                # 日志内容详情
                'msg': msg,
                # 日志备注信息
                'remarks': remarks,

            }
            #  为少少相关记录,删除不必要的为空的日志内容信息,
            if not remarks:
                log.pop('remarks')
            if not msg:
                log.pop('msg')
            try:
                log_msg = json_helper.dict_to_json_ensure_ascii(log)  # 返回文本
                logger.info(log_msg)
            except:
                logger.info(getattr(request.state, 'traceid') + ':索引:' + str(getattr(request.state, 'trace_links_index')) + ':日志信息写入异常')

    async def _init_trace_start_log_record(self, request: Request):
        '''
        请求记录初始化
        :return:
        '''

        # 配置当前的清除的上下文对象
        # request.app.


        path_info = request.url.path
        if path_info not in ['/favicon.ico'] and 'websocket' not in path_info:
            if request.method != 'OPTIONS':
                # 追踪索引
                request.state.trace_links_index = 0
                # 追踪ID
                # request.traceid = str(uuid.uuid4()).replace('-', '')
                request.state.traceid = shortuuid.uuid()
                # 计算时间
                request.state.start_time = perf_counter()
                # 获取请求来源的IP,请求的方法
                ip, method, url = request.client.host, request.method, request.url.path
                # print('scope', request.scope)
                # 先看表单有没有数据:
                try:
                    body_form = await request.form()
                except:
                    body_form = None

                body = None
                try:
                    body_bytes = await request.body()
                    if body_bytes:
                        try:
                            body = await  request.json()
                        except:
                            pass
                            if body_bytes:
                                try:
                                    body = body_bytes.decode('utf-8')
                                except:
                                    body = body_bytes.decode('gb2312')
                except:
                    pass

                # 从头部里面获取出对应的请求头信息,用户用户机型等信息获取
                user_agent = parse(request.headers["user-agent"])
                browser = user_agent.browser.version
                if len(browser) >= 2:
                    browser_major, browser_minor = browser[0], browser[1]
                else:
                    browser_major, browser_minor = 0, 0

                user_os = user_agent.os.version
                if len(user_os) >= 2:
                    os_major, os_minor = user_os[0], user_os[1]
                else:
                    os_major, os_minor = 0, 0

                log_msg = {
                    # 'headers': str(request.headers),
                    # 'user_agent': str(request.user_agent),
                    # 记录请求头信息----如果需要特殊的获取某些请求的记录则做相关的配置即可
                    'headers': [request.headers.get(i, '') for i in self.nesss_access_heads_keys] if self.nesss_access_heads_keys else None,
                    # 记录请求URL信息
                    "useragent":
                        {
                            "os": "{} {}".format(user_agent.os.family, user_agent.os.version_string),
                            'browser': "{} {}".format(user_agent.browser.family, user_agent.browser.version_string),
                            "device": {
                                "family": user_agent.device.family,
                                "brand": user_agent.device.brand,
                                "model": user_agent.device.model,
                            }
                        },
                    'url': url,
                    # 记录请求方法
                    'method': method,
                    # 记录请求来源IP
                    'ip': ip,
                    # 'path': request.path,
                    # 记录请求提交的参数信息
                    'params': {
                        'query_params': parse_qs(str(request.query_params)),
                        'from': body_form,
                        'body': body
                    },
                    # 记录请求的开始时间
                    "ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
                    # 'start_time':  f'{(start_time)}',
                }
                if not log_msg['headers']:
                    log_msg.pop('headers')

                if not log_msg['params']['query_params']:
                    log_msg['params'].pop('query_params')
                if not log_msg['params']['from']:
                    log_msg['params'].pop('from')
                if not log_msg['params']['body']:
                    log_msg['params'].pop('body')
                # 执行写入--日志具体的内容信息
                await self.async_trace_add_log_record(request, event_type='request', msg=log_msg)

    async def _init_trace_end_log_record(self, request: Request, response: Response):

        # https://stackoverflow.com/questions/64115628/get-starlette-request-body-in-the-middleware-context
        # 如果响应图的类型,仅仅记录字符串类型的结尾的日志信息
        if 'image' not in response.media_type and hasattr(request.state, 'traceid'):
            start_time = getattr(request.state, 'start_time')
            end_time = f'{(perf_counter() - start_time):.2f}'
            # 获取响应报文信息内容
            rsp = None
            if isinstance(response, Response):
                rsp = str(response.body, encoding='utf-8')
            log_msg = {
                # 记录请求耗时
                "status_code": response.status_code,
                'cost_time': end_time,
                #  记录请求响应的最终报文信息--eval的作用是去除相关的 转义符号 ""ok""===》ok
                'rsp': json_helper.json_to_dict(rsp),
                "ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
            }
            await self.async_trace_add_log_record(request, event_type='response', msg=log_msg)

    def get_route_handler(self) -> Callable:
        original_route_handler = super().get_route_handler()

        # 自定义路由的方式内容
        async def custom_route_handler(request: Request) -> Response:
            # 请求前的处理-日志的初始化操作
            await self._init_trace_start_log_record(request)

            response: Response = await original_route_handler(request)

            # 一个API请求处理完成后的-日志收尾记录
            await self._init_trace_end_log_record(request, response)

            return response

        return custom_route_handler
Copy the code

With the above plugin, you can register directly in the app object.

Having initialized just our log object and our log configuration above, next we need to initialize the log corresponding to our route as well:

Add a custom route implementation to our route for request logging:

With the above registration, if there is a need to log only need to use:


await ContextLogerRoute.async_trace_add_log_record(self.app.state.curr_request, event_type='Third party interface', msg=info_interface)
Copy the code

The verification example is as follows:

From above we can record the entire request link log!

2.2.2 Registering global exception capture Information

Global exception handler, for the unified exception interception capture and unified response message, is very key drop!

2.2.2.1 Exception Plug-ins – Extension Location:

2.2.2.2 Exception Plug-ins – Registration mode:

Thinking, in fact, all of our plug-in extension ideas can not be registered to our app object in our extension, so that they can refer to the APP object in our extension related calls, so we follow this idea in fact, we extend the so-called plug-in, so that we can very aspects of the definition of their own needs!

For example, our global exception object mainly uses our app’s exception interception, so we can customize our own exception catching class to handle all the related exceptions.

2.2.2.3 Exception Class Plug-in – Class implementation:

#! / usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: Just set py files function description: the founder of functional description: let students creation time: 2021/7/15 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- to modify description - 2021/7/15: ------------------------------------------------- """ from fastapi import FastAPI, Request from apps.response.json_response import * from starlette.exceptions import HTTPException as StarletteHTTPException from fastapi.exceptions import HTTPException as FastapiHTTPException from fastapi.exceptions import RequestValidationError from pydantic.errors import * from apps.ext.logger import logger import traceback from apps.utils.singleton_helper import Singleton @Singleton class ApiExceptionHandler(): def __init__(self, app=None, *args, **kwargs): super().__init__(*args, **kwargs) if app is not None: self.init_app(app) def init_app(self, app: FastAPI): # @app.exception_handler(StarletteHTTPException) # @app.exception_handler(RequestValidationError) # @app.exception_handler(Exception) app.add_exception_handler(Exception, Handler =self.all_exception_handler) # StarletteHTTPException = 405 This is where I go app.add_Exception_handler (StarletteHTTPException, handler=self.http_exception_handler) app.add_exception_handler(RequestValidationError, handler=self.validation_exception_handler) async def validation_exception_handler(self, request: Request, exc: RequestValidationError): Selfself, exc.errors()[0].get('loc')) if isinstance(exc.raw_errors[0].exc, IntegerError): pass elif isinstance(exc.raw_errors[0].exc, MissingError): Pass return ParameterException(http_status_code=400, api_code=400, message=' error ', result={"detail": exc.errors(), "body": exc.body }) async def all_exception_handler(self, request: Request, exc: Exception): Param request: :param exc: :return: Param request: :param exc: :return: {request.url.path}\n error message: {traceback.format_exc()}" if isinstance(exc, StarletteHTTPException) or isinstance(exc, FastapiHTTPException): if exc.status_code == 405: return MethodnotallowedException() if exc.status_code == 404: return NotfoundException() elif exc.status_code == 429: return LimiterResException() elif exc.status_code == 500: return InternalErrorException() elif exc.status_code == 400: # there are parts where the direct choice is to raise an exception in the form of raise. # raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token') return BadrequestException(msg=exc.detail) return BadrequestException() else: Logger.exception (exc) traceback.print_exc() return InternalErrorException() async def http_exception_handler(self, request: Request, exc: StarletteHTTPException): Param request: :param exc: :return: Param request: :param exc: :return: This is where all of our HTTP responses are monitored globally, including all HTTP responses up to 200. {traceback.format_exc()}" if exc.status_code == 405: return MethodnotallowedException() if exc.status_code == 404: return NotfoundException() elif exc.status_code == 429: return LimiterResException() elif exc.status_code == 500: return InternalErrorException() elif exc.status_code == 400: # there are parts where the direct choice is to raise an exception in the form of raise. # raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token') return BadrequestException(MSG =exc.detail)Copy the code

PS: The above implementation can actually use two ways, one is the way to define the function, and then handle our exception through add_Exception_handler; The other is actually a direct decorator way, in order to clear point we take the first solution!

2.2.2.4 Exception Plug-ins – Verification:

Verify that the function intentionally throws an exception:

Results:

Exception throw capture location:

Definition of the corresponding exception response message:

2.2.2.5 Adding the definition of global Response Message) :

  • Position of definition:

By default, there are three kinds of JSON parsing, we separate three for convenience, this only shows one!

  • json_response.py
` ` ` #! / usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: Json_response file function description: Function description creator: Xiaozhong 2021/7/15 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - 2021/7/15 modify description: ------------------------------------------------- """ from typing import Any, Dict, Optional # Customize the response body of the returned error # ORJSONResponse orjson from fastapi.responses import JSONResponse import time from fastapi.encoders import jsonable_encoder class ApiResponse(JSONResponse): # define return response code -- if not specified, the default is always 200 http_status_code = 200 # default success API_code = 0 # default Node. If this parameter is mandatory, drop the default value. Result: Optional[Dict[str, Timestamp = int(time.time() * 1000) def __init__(self, success= None, http_status_code=None, api_code=None, result=None, message=None, **options): if result: self.result = result if message: self.message = message if api_code: self.api_code = api_code if success ! = None: self.success = success if http_status_code: Body = dict(message=self.message, code=self.api_code, success=self.success, result=self.result, timestamp=self.timestamp, Request ="POST v1/client/register" # request=request.method + "+ self.get_url_no_param()) # customize_headers = {  # # 'Access-Control-Allow-Origin': '*', # # access-control-allow-methods: DELETE, GET, OPTIONS, PATCH, POST, PUT # 'access-control-allow-methods': 'DELETE, GET, OPTIONS, PATCH, POST, PUT', # 'Access-Control-Allow-Origin': '*', # 'Access-Control-Allow-Headers': '*,X-Access-Token,School-Teacher-Token,school-teacher-token,T-Access-Token,x-access-token,Referer, Accept, Origin, User-Agent,X-Requested-With, Content-Type, X-File-Name', # # 'Access-Control-Request-Headers': 'Content-Type,Access-Token', # 'Content-Type': 'application/json; Charset = utF-8 '#} # jSONable_encoder to handle different strings returns such as timestamp datatime super(ApiResponse, Self).__init__(status_code=self.http_status_code, content=jsonable_encoder(body), **options) # This render will automatically call, # def render(self, content: Any) -> bytes: # # return dict_to_json_ensure_ascii_indent(content) class BadrequestException(ApiResponse): Http_status_code = 400 API_code = 10031 result = None # {} or [] message = 'error request' success = False class LimiterResException(ApiResponse): Http_status_code = 429 API_code = 429 result = None # {} or [] message = 'access too fast' success = False class ParameterException(ApiResponse): Http_status_code = 400 result = {} message = 'validation error' API_code = 10031 success = False class UnauthorizedException(ApiResponse): Http_status_code = 401 result = {} message = 'unauthorized authorization' API_code = 10032 SUCCESS = False class ForbiddenException(ApiResponse): http_status_code = 403 result = {} message = 'Failed! Current access does not have permission, or operation data does not have permission! ' api_code = 10033 success = False class NotfoundException(ApiResponse): Http_status_code = 404 result = {} message = 'access address not found' API_code = 10034 success = False class MethodnotallowedException(ApiResponse): Http_status_code = 405 result = {} message = 'not allowed to submit access using this method' API_code = 10034 success = False class OtherException(ApiResponse): Http_status_code = 800 result = {} message = 'unknown other HTTPEOOER exception' API_code = 10034 success = False class InternalErrorException(ApiResponse): http_status_code = 500 result = {} message = 'Programmer didn't get enough sleep, system crashed! ' api_code = 500 success = False class InvalidTokenException(ApiResponse): Http_status_code = 401 API_code = 401 message = 'no operation, token invalid' SUCCESS = False class ExpiredTokenException(ApiResponse): Http_status_code = 422 message = ' Token expired 'API_code = 10050 SUCCESS = False Class FileTooLargeException(ApiResponse): Http_status_code = 413 API_code = 413 result = None # The result can be {} or [] message = 'file size too large' class FileTooManyException(ApiResponse): Http_status_code = 413 message = 'too many files' API_code = 10120 result = None # The result can be {} or [] class FileExtensionException(ApiResponse): Http_status_code = 401 message = 'file extension is invalid' API_code = 10121 result = None # Http_status_code = 200 API_code = 200 result = None # Result can be {} or [] message = 'custom return with success' success = True class Fail(ApiResponse): Http_status_code = 200 API_code = 200 result = None # The result can be {} or [] message = 'custom return with success' SUCCESS = False 'Copy the code

Note:

One place above is to introduce JSONable_encoder to solve the problem of things that are partially JSON data types!

2.2.3 Global Settings Cross – domain Settings

Ijnx can be set for a specific address across the domain. Allow_origins is used to set the self-contained cross-domain whitelist.

2.2.4 Registering Registering global middleware

In general, global middleware processes authentication for all interfaces, so it is the easiest way to use middleware for authentication for convenience.

Here we use globally authenticated plug-ins to illustrate:

2.2.4.1 Global Middleware – Location of authentication middleware:

2.2.4.2 Global Middleware – Registration Mode:

2.2.4.3 Global Middleware – Implementation Classes:

#! The/usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: auth file function description: Founder of functional description: let students creation time: 2021/6/7 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - 2021/6/7 modify description: ------------------------------------------------- """ from time import perf_counter from loguru import logger from starlette.middleware.base import BaseHTTPMiddleware from starlette.requests import Request from apps.config.auth_conf import auth as auth_conf from apps.response.json_response import ForbiddenException, InvalidTokenException, ExpiredTokenException, JSONResponse from apps.ext.jwt.simple_auth import SimpleAuth as Auth from fastapi import HTTPException from starlette.status import HTTP_400_BAD_REQUEST class AuthMiddleware(BaseHTTPMiddleware): def __init__(self, *args, **kwargs): Super ().__init__(*args, **kwargs) # Authentication form 1: use default, 2: use custom self.auth_type = 2 def check_auth_token(self, request): Param Request: :return: Param Request: :return: The Authorization header of the Authorization request must be the Authorization header of the Authorization request. While_auth_ulr = auth_conf.ADMIN_WHILE_ROUTE # If request.url.path not in while_auth_ulr  and 'sys/randomImag' not in request.url.path and 'docs' not in request.url.path: if self.auth_type == 1: token = request.headers.get('Authorization', None) if not token: return ForbiddenException() else: Token = request. Headers. Get (' x-access-token ', None) if not token: Return ForbiddenException() # raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token') return token def authenticate_credentials(self, token): :return: ''' isok, state, token_userinfo_result = Auth.verify_bearer_token_state(token=token) if not isok and state == 1: return InvalidTokenException() if not isok and state == 2: return ExpiredTokenException() return token_userinfo_result async def authenticate_credentials_user_info(self, Token_userinfo_result) : "' on the inside of the TOken contains user information validation: param TOken: : return: ' 'isok, isstatus = False, 2 if not isok: Return ForbiddenException(MSG =' This user does not exist, please contact the administrator! ') # if isstatus.get('status') == 2: # return ForbiddenException(MSG =' this user has been frozen, please contact the administrator! ') async def dispatch(self, request: Request, call_next): # if isinstance(token_result, JSONResponse) is handled in this way: # return token_result # 1: Check whether the authentication information is compatible. If the authentication information is not compatible, an error message is returned. If the authentication information is compatible, the corresponding Token value is returned. While_auth_ulr = auth_conf.ADMIN_WHILE_ROUTE # print(' while_auth_ULr ', while_auth_ULr) # # print(' aaaaAAawhile_auth_ulr ', while_auth_ulr) if request. Scope ["method"]! ='OPTIONS' and request.url.path not in while_auth_ulr and 'sys/randomImag' not in request.url.path and 'docs' not in request.url.path: if self.auth_type == 1: token = request.headers.get('Authorization', None) if not token: return ForbiddenException() else: Token = request. Get (' x-access-token ', 'x-access-token') None) # print(" token ",token) if not token: Return ForbiddenException() isok, state, token_userinfo_result = Auth.verify_bearer_token_state(token=token) if not isok and state == 1: return InvalidTokenException() if not isok and state == 2: Return ExpiredTokenException() # write the current object around the current request request.state.token_userInfo_result = token_userInfo_result response = await call_next(request) return responseCopy the code

2.2.4.3 Global Middleware – Authentication verification:

An interface is added but not whitelisted:

To enable our middleware hot certification:

To access unwhitened addresses:

2.2.5 Registering global start and close events

In fact, there is no special global processing for this place, so there is no content, of course, if the subsequent place such as database, need to deal with, but not in this way to deal with! It’s also a plugin way to deal with it! So there’s nothing to expand on here, or ignore!

2.2.6 Registering a global third-party Extension Instance

Here are examples of third-party extensions that can be registered and instantiated according to their own needs.

I can instantiate a custom implementation of async_client, so that you can use this object instance globally and other addresses directly.

2.2.6.1 Plugin AsynClientSession instance – Location:

2.2.6.2 Plug-in AsynClientSession instance – Registration:

2.2.6.3 Plug-in AsynClientSession instance – Class Implementation:

This custom implementation is designed to add logs related to third-party interface requests,

#! / usr/bin/evn python # - * - coding: utf-8 - * - "" "-- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- the file name: Just set py files function description: the founder of functional description: let students creation time: 2021/7/16 -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- to modify description - 2021/7/16: Http client of asynchronous request log package -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- "" "the from dataclasses import dataclass from aiohttp Import ClientSession from dotmap import dotmap import traceback import aiohttp apps.ext.logger.contexr_logger_route import ContextLogerRoute from fastapi import Request,FastAPI from apps.utils.singleton_helper import Singleton from urllib.parse import parse_qs from fastapi import FastAPI @Singleton @dataclass class AsynClientSession(): pass def __init__(self, aiohttp_session: ClientSession = None,app: FastAPI=None): If app is not None: self.init_app(app) def init_app(self,app:) FastAPI): self.app = app async def request(self,api_url, method='GET', headers={},islogrecord=False, params=None): try: if islogrecord and not getattr(self.app.state,'curr_request'): Raise Exception(' The FastapiApp object needs to be passed in and context middleware needs to register the global Settings request body object ') if not self.session: -Unclosed client session async with aiohttp.ClientSession() as session: async with session.request(url=api_url, method=method, headers=headers, params=params) as resp: Raise_for_status () if resp.status in (401, 403): raise Exception(" Raise Exception! # print('resp.content_type',resp.content_type) try: response = await resp.json() except: Response = await resp.text() # log if islogrecord and self.app: info_interface = {'url': API_URL, 'method': method, 'headers': str(headers) if headers else '', 'params': parse_qs(str(params)), 'state_code': str(resp.status), 'result': response, } await ContextLogerRoute.async_trace_add_log_record(self.app.state.curr_request, event_type='Third party interface', msg=info_interface) else: async with self.session.request(url=api_url, method=method, headers=headers, params=params) as resp: Raise_for_status () if resp.status in (401, 403): raise Exception(" Raise Exception! Error ") response = await list.json () # await self.session.close() return response except Exception: traceback.print_exc() async_client= AsynClientSession() if __name__ == '__main__': from asyncio import run async def main(): Results = await async_client. Request (api_url = 'http://127.0.0.1:8080/check', islogrecord = False) print (results) of the run (the main ())Copy the code

2.2.6.4 Plug-in AsynClientSession instance – Verification:

View the request log information:

2.2.7 Importing Registered Routes in Batches

Because one of our projects may contain more routes, if we go one by one

app.include_router(router)
Copy the code

I personally don’t like this way! Therefore, I refer to the previous flask mode to carry out batch import registration, that is, to find a certain instance object attribute under a certain module and unify it

app.include_router(router)
Copy the code

The specific implementation method is as follows:

2.2.7.1 Specify import Modules:

2.2.7.2 Importing Tool Classes in Batches – Specify the project directory to be imported.

Know the directory to query, and then traverse the following have about

bp
Copy the code

Instance properties, and then dynamically mount:

Router = getattr(module, key_attribute) # # router.route_class = ContextLogerRoute app.include_router(router)Copy the code

2.2.7.3 Importing Tool Classes in Batches – Implementation:

from fastapi import FastAPI, FastAPI from fastapi import APIRouter from ZtjDirImport import DirImport from apps.utils.modules_helper import find_modules, import_string def print_all_routes_info(app: FastAPI): for ro in app.routes: print('name:', ro.name, '=====>', 'path:', __init def register_nestable_blueprint(app=None, project_name=None, api_name=' API ', Key_attribute ='bp', hongtu='hongtu'): "automatic import blueprint module :param app: :return:" if not app: Import warnings warnings. Warn (' Route registration failed, need to pass in the Flask object instance ') return None if project_name: # include_packages This setting to True is critical. It contains the ability to detect properties within _init__, Modules = find_modules(f'{project_name}.{api_name}', include_packages=True, recursive=True) for name in modules: module = import_string(name) if hasattr(module, key_attribute): # app.register_blueprint(module.mmpbp) # lantu = getattr(module,key_attribute) # print('sdasda',getattr(module,key_attribute).__dict__) app.include_router(getattr(module, key_attribute)) # app.register_blueprint(getattr(module,key_attribute)) if hasattr(module, hongtu): Pass # print(' match ', name) # getattr(module, hongtu).register(lantu) else: Import warnings warnings. Warn (' route registration failed, external project name not defined yet ') def register_nestabLE_blueprint_for_log (app=None, project_name=None, Api_name =' API ',scan_name=' API ', key_attribute='bp', hongtu='hongtu'): automatic import blueprint module :param app: :return: "If not app: import Warnings warnings. Warn (' Route registration failed, need to pass in Fastapi object instance ') return None if project_name: # include_packages This setting to True is critical. It contains the ability to detect properties within _init__, Modules = find_modules(f'{project_name}.{api_name}', include_packages=True, recursive=True) from apps.ext.logger.contexr_logger_route import ContextLogerRoute for name in modules: Module = import_string(name) # if not name. Endswith (scan_name): continue if hasattr(module, key_attribute): # app.register_blueprint(module.mmpbp) # lantu = getattr(module,key_attribute) router = getattr(module, Key_attribute) # is globally mounted. # router.route_class = ContextLogerRoute app.include_router(router) # app.register_blueprint(getattr(module,key_attribute)) if hasattr(module, hongtu): Pass # print(' match ', name) # getattr(module, hongtu).register(lantu) else: Import warnings warnings. Warn (' Route registration failed, external project name not yet defined ')Copy the code

About what’s used in it

from apps.utils.modules_helper import find_modules, import_string
Copy the code

Mainly using Flask’s code:

import sys # from werkzeug.utils import find_modules, import_string import pkgutil def import_string(import_name, silent=False): """Imports an object based on a string. This is useful if you want to use import paths as endpoints or something similar. An import path can be specified either in dotted notation (``xml.sax.saxutils.escape``) or with a colon as object delimiter (``xml.sax.saxutils:escape``). If `silent` is True the return value will be `None` if the import fails.  :param import_name: the dotted name for the object to import. :param silent: if set to `True` import errors are ignored and `None` is returned instead. :return: imported object """ # force the import name to automatically convert to strings # __import__ is not able to handle unicode strings in the fromlist # if the module is a package import_name = str(import_name).replace(":", ".") try: try: __import__(import_name) except ImportError: if "." not in import_name: raise else: return sys.modules[import_name] module_name, obj_name = import_name.rsplit(".", 1) module = __import__(module_name, globals(), locals(), [obj_name]) try: return getattr(module, obj_name) except AttributeError as e: raise ImportError(e) except ImportError as e: Def find_modules(import_path, include_packages=False, recursive=False): """Finds all the modules below a package. This can be useful to automatically import all views / controllers so that their metaclasses / function decorators have a chance to register themselves on the application. Packages are not returned unless `include_packages` is `True`. This can also recursively list modules but in that case it will import all  the packages to get the correct load path of that module. :param import_path: the dotted name for the package to find child modules. :param include_packages: set to `True` if packages should be returned, too. :param recursive: set to `True` if recursion should happen. :return: generator """ module = import_string(import_path) path = getattr(module, "__path__", None) if path is None: raise ValueError("%r is not a package" % import_path) basename = module.__name__ + "." for _importer, modname, ispkg in pkgutil.iter_modules(path): modname = basename + modname if ispkg: if include_packages: yield modname if recursive: for item in find_modules(modname, include_packages, True): yield item else: yield modname def get_modules(package="."): """ Import OS modules = [] files = os.listdir(package) for file in files: If not file.startswith("__"): name, ext = os.path.splitext(file) modules. Append (name) print(" name ", name) return modulesCopy the code

2.2.7.4 Importing Tool Classes in Batches – Final Result

Definition of project interface:

Examples of interface definitions:

Import API definitions directly without too much intervention:

The above is a brief description of the route batch import, and then have a chance to expand on this definition API!

2.3 Plug-in asynchronous Redis sample extension

Our asynchronous Redis is also registered and instantiated in a plug-in manner, mainly by

This approach introduces our app object to take over the registration of the corresponding hook function and handle the related event handling in the hook function.

2.3.1 Location of plug-in definition:

2.3.1 Plug-in Class Implementation Definition:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     __init__.py
   文件功能描述 :   功能描述
   创建人 :       小钟同学
   创建时间 :          2021/7/27
-------------------------------------------------
   修改描述-2021/7/27:         
-------------------------------------------------
 # await app.state.redis.set("my-key", "valueaaaaaaaaaaaa")
            # value = await app.state.redis.get("my-key")
            # print(value)
            # print("HASH字典的操作")
            # await self.hmset_dict("hash", key1="value1", key2="value2", key3=123)
            # result = await self.hgetall("hash")
            # print("HASH字典的操作",result)

            # result  = await self.add_str_ex('sdsds','sssssssssssssss')
            # print(result)
            # value = await app.state.redis.get("sdsds")
            # print(value)
"""
from aioredis import Redis, create_redis_pool, create_sentinel
from apps.config.redis_conf import redisconf
from typing import Tuple, Any
from fastapi import FastAPI
from apps.utils.singleton_helper import Singleton
from contextlib import asynccontextmanager
import asyncio
import json
import datetime
from typing import Set, Any, Optional

# from functools import cached_property, lru_cache
# Python 3.8的cached_property

@Singleton
class AsyncRedisClient():

    def __init__(self, app: FastAPI = None):
        # 如果有APPC传入则直接的进行初始化的操作即可
        self.redis = None
        if app is not None:
            self.init_app(app)

    def init_app(self, app: FastAPI):
        self.app = app

        @app.on_event("startup")
        async def startup_event():
            app.state.redis = await self.init_redis_pool()

            # 初始化缓冲器对象
            from apps.ext.cache import FastAPICache
            from apps.ext.cache.backends.redis import RedisBackend
            FastAPICache.init(RedisBackend(self.redis_db), prefix="xxx-cache")

            # 登入状态集
            await self.setbit('login_status', 100010, 1)
            await self.setbit('login_status', 100011, 1)

            erer = await self.getbit('login_status', 100011)
            print('100011的在线状态', erer)
            erer = await self.getbit('login_status', 100012)
            print('100012的在线状态', erer)

            erer = await self.getbit('login_status', 100010)
            print('100010的在线状态', erer)
            await self.setbit('login_status', 100010, 0)

            erer = await self.getbit('login_status', 100010)
            print('100010的在线状态', erer)

            # 签到处理
            # key 可以设计成 uid:sign:{userId}:{yyyyMM},月份的每一天的值 - 1 可以作为 offset(因为 offset 从 0 开始,所以 offset = 日期 - 1)。key 可以设计成 uid:sign:{userId}:{yyyyMM},月份的每一天的值 - 1 可以作为 offset(因为 offset 从 0 开始,所以 offset = 日期 - 1)。
            await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 15, 1)
            await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 14, 1)
            await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 13, 1)
            await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 12, 1)
            await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 11, 1)
            erer = await self.getbit('uid:sign:{0}:{1}'.format(100010, 202105), 15)
            print('100010在202105的16号的签到情况', erer)
            erer = await self.getbit('uid:sign:{0}:{1}'.format(100010, 202105), 11)
            print('100010在202105的16号的签到情况', erer)
            erer = await self.bitcount('uid:sign:{0}:{1}'.format(100010, 202105))
            print('100010在202105签到总次数', erer)

            erer = await self.bitpos('uid:sign:{0}:{1}'.format(100010, 202105), 1)
            print('100010在202105首次打开的日期,也就是第一次标记为1的bit的位置应该是11》测试一下看看', erer)
            await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 11, 0)
            erer = await self.bitpos('uid:sign:{0}:{1}'.format(100010, 202105), 1)
            print('100010在202105首次打开的日期,也就是第一次标记为1的bit的位置应该是11》测试xiugaiwehou一下看看', erer)

            erer = await self.set_sign_status(100010)
            print('100010在202105签到:', erer)
            erer = await self.get_sign_status(100010)
            print('100010在202105签到:', erer)

            erer = await self.get_user_week_sign_status(100010)
            print('100010这周的签到的情况:', erer)

            erer = await self.get_user_month_sign_status(100010)
            print('100010这月的签到的情况:', erer)


        @app.on_event("shutdown")
        async def shutdown_event():
            app.state.redis.close()
            await app.state.redis.wait_closed()

    async def get_redis(self):
        return self.redis

    # @cached_property
    async def init_redis_pool(self) -> Redis:

        if redisconf.use_redis_sentinel:
            # 创建哨兵机制模型的下的链接对象
            sentinel = await create_sentinel(
                [(redisconf.redis_sentinel_url, redisconf.redis_sentinel_port)],
                db=redisconf.redis_db,
                password=redisconf.redis_password,
                encoding="utf-8",
            )
            self.redis_db = sentinel.master_for(redisconf.redis_sentinel_master_name)
        else:
            # 创建连接池的方式
            self.redis_db = await create_redis_pool(
                redisconf.redis_url,
                # password=redisconf.redis_password,
                # encoding="utf-8",
                # db=redisconf.redis_db,
            )

            # result  = await self.set_json('sdsds',{
            #     'sdas':2323,
            #     'sdas222': {
            #         '你':'唉是就是基22地'
            #     }
            # })
            # print(result)
            # result = await self.get_json('sdsds')
            # print(result)

        return self.redis

    async def get_with_ttl(self, key: str) -> Tuple[int, str]:
        async with self.redis_db.pipeline(transaction=True) as pipe:
            return await (pipe.ttl(key).get(key).execute())

    async def get(self, key) -> str:
        return await self.redis_db.get(key)

    async def set(self, key: str, value: str, expire: int = None):
        return await self.redis_db.set(key, value, ex=expire)

    async def setex(self, key, seconds, value):
        print("ssssssssss")
        return await self.redis_db.setex(key, seconds, value)

    async def pttl(self, key: str) -> int:
        """Get PTTL from a Key"""
        return int(await self.redis_db.pttl(key))

    async def ttl(self, key: str) -> int:
        """Get TTL from a Key"""
        return int(await self.redis_db.ttl(key))

    async def pexpire(self, key: str, pexpire: int) -> bool:

        return bool(await self.redis_db.pexpire(key, pexpire))

    async def expire(self, key: str, expire: int) -> bool:

        return bool(await self.redis_db.expire(key, expire))

    async def incr(self, key: str) -> int:
        """Increases an Int Key"""
        return int(await self.redis_db.incr(key))

    async def decr(self, key: str) -> int:
        """Decreases an Int Key"""
        return int(await self.redis_db.decr(key))

    async def hmset_dict(self, key, **val) -> str:
        return await self.redis_db.hmset_dict(key, **val)

    async def hgetall(self, key, ):
        return await self.redis_db.hgetall(key, encoding="utf-8")

    # 不存在则加入,否则不变
    async def add_str_nx(self, key, values):  # value可以为复杂的json
        return await self.redis_db.setnx(key, values)

    # 加入缓存,存在会替换,并加入过期时间
    async def add_str_ex(self, key, values, time=10):  # value可以为复杂的json
        return await self.redis_db.setex(key, time, values)

    async def clear(self, namespace: str = None, key: str = None) -> int:
        if namespace:
            lua = f"for i, name in ipairs(redis.call('KEYS', '{namespace}:*')) do redis.call('DEL', name); end"
            return await self.redis_db.eval(lua, numkeys=0)
        elif key:
            return await self.redis_db.delete(key)

    async def check_lock(self, key):
        """
        检查当前KEY是否有锁
        """
        key = 'lock:%s' % key
        status = await self.redis_db.get(key)
        if status:
            return True
        else:
            return False

    async def acquire_lock(self, key, expire=30, step=0.03):
        """
        为当前KEY加锁, 默认30秒自动解锁
        """
        key = 'lock:%s' % key
        while 1:
            get_stored = await self.redis_db.get(key)
            if get_stored:
                await asyncio.sleep(step)
            else:
                lock = await self.redis_db.setnx(key, 1)
                if lock:
                    await self.redis_db.expire(key, expire)
                    return True

    async def release_lock(self, key):
        """
        释放当前KEY的锁
        """
        key = 'lock:%s' % key
        await self.safe_delete(key)

    @asynccontextmanager
    async def with_lock(self, key, expire=30, step=0.03):
        """
        @desc redis分布式锁封装
        :param key: 缓存key
        :param expire: 锁失效时间
        :param step: 每次尝试获取锁的间隔
        :return:
        for example:
        with RedisCacheProxy().with_lock("key_name") as lock:
            "do something"
        """
        try:
            t = await self.acquire_lock(key, expire, step)
            yield t
        finally:
            await self.release_lock(key)

    async def get_many(self, keys: list) -> list:
        """
        @desc 批量获取字符串
        :params keys: [chan1, char2]
        """
        data = await self.redis_db.mget(*keys, encoding="utf-8")
        return data

    async def set_many(self, data: dict):
        """批量设置字符串缓存"""
        data = await self.redis_db.mset(data)
        return data

    async def get_data(self, key: str) -> str:
        """获取字符串数据并尝试转换json"""
        value = await self.redis_db.get(key)
        if value:
            try:
                value = json.loads(value.decode("utf-8"))
            except:
                pass
        return value

    async def set_data(self, key: str, value, ex: int = None):
        """尝试转正json字符串存储"""
        try:
            value = json.dumps(value)
        except:
            pass
        return self.redis_db.set(key, value, ex=ex)

    async def delete(self, key):
        """直接删除一个key"""
        await self.redis_db.delete(key)

    async def safe_delete(self, key: str):
        """失效一个key"""
        await self.redis_db.expire(key, -1)

    async def delete_many(self, keys: list) -> None:
        """批量key失效"""
        await self.redis_db.delete(*keys)

    async def exists(self, key: str) -> bool:
        """查询key是否存在"""
        data = await self.redis_db.exists(key)
        return data

    def hget(self, key: str, field: str):
        """获取hash类型一个键值"""
        return self.redis_db.hget(key, field)

    def hmget(self, key: str, fields: list):
        """
        批量获取hash类型键值
        :param key:
        :param fields:
        :return:
        """
        return self.redis_db.hmget(key, fields)

    async def hget_data(self, key: str, field: str) -> Any:
        """获取hash的单个key"""
        data = await self.redis_db.hget(key, field)
        return json.loads(data) if data else None

    async def hmget_data(self, key: str, fields: list) -> list:
        """
        @desc hash类型获取缓存返回一个list
        """
        data = await self.redis_db.hmget(key, *fields)
        return [json.loads(i) if i is not None else None for i in data]

    async def hmget2dict_data(self, key: str, fields: list) -> dict:
        """
        @desc hash类型获取缓存返回一个dict,尝试转换json格式
        """
        cache_list = await self.redis_db.hmget(key, fields)
        return dict(zip(fields, [json.loads(i) if i is not None else None for i in cache_list]))

    async def get_json(self, key: str) -> dict:
        """
        @desc 获取json格式的字典数据
        """
        data = await self.redis_db.hgetall(key)
        if data:
            return {k: json.loads(v) for k, v in dict(data).items()}
        return {}

    async def set_json(self, key: str, value: dict, ex: int = None):
        """
        @desc 使用hash存贮json结构的数据
        :return:
        """
        cache_data = []
        for k, v in value.items():
            cache_data.extend([k, json.dumps(v)])
        if not cache_data:
            return True
        pipe = self.redis_db.pipeline()
        pipe.hmset(key, *cache_data)
        if ex:
            pipe.expire(key, int(ex))
        res = await pipe.execute()
        return res

    async def sadd(self, key: str, values: list) -> int:
        """添加元素"""
        if not values:
            return 0
        count = await self.redis_db.sadd(key, *values)
        return count

    async def spop(self, key: str, count: int = None) -> list:
        """从集合弹出元素"""
        count = 1 if not count else count
        values = await self.redis_db.spop(key, count=count)
        return values if values else []

    async def smembers(self, key: str) -> list:
        """返回一个集合所有元素"""
        values = await self.redis_db.smembers(key)
        return values if values else []

    async def smembers_back_set(self, key: str) -> Set:
        """Gets Set Members"""
        return set(await self.redis_connection.smembers(key))

    async def scard(self, key: str) -> int:
        """获取一个集合的元素个数"""
        count = await self.redis_db.scard(key)
        return count

    async def zadd(self,key, *args, **kwargs):
        # redis zadd操作(批量设置值至args有序集合中)
        if not (args or kwargs):
            return False
        count = await self.redis_db.zadd(key, *args, **kwargs)
        return count

    async def zrem(self,key, member, *members):
        # redis zrem操作(删除name有序集合中的特定元素)
        if not key:
            return False
        count = await self.redis_db.zrem(key,member, *members)
        return count

    async def zincrby(self,key, name, value, amount=1):
        # 如果在key为name的zset中已经存在元素value,则该元素的score增加amount,否则向该集合中添加该元素,其score的值为amount
        if not (name or value):
            return False
        return await self.redis_db.zincrby(key, value, amount)

    async def zrevrank(self,key, value):
        if not value:
            return False
        return await self.redis_db.zrevrank(key, value)

    async def zscore(self, key,member):
        if not member:
            return False
        return self.redis_db.zscore(key, member)


    async def setbit(self, key: str, offset: int, value: int) -> int:
        """
        1:设置或者清空 key 的 value 在 offset 处的 bit 值(只能是 0 或者 1)
        2:只需要一个 key = login_status 表示存储用户登陆状态集合数据, 将用户 ID 作为 offset,在线就设置为 1,下线设置 0。
        3:需要注意的是 offset 从 0 开始
        """
        count = await self.redis_db.setbit(key, offset, value)
        return count

    async def getbit(self, key: str, offset: int) -> int:
        """
        1:获取 key 的 value 在 offset 处的 bit 位的值,当 key 不存在时,返回 0。
        """
        count = await self.redis_db.getbit(key, offset)
        return count

    async def bitcount(self, key: Any) -> int:
        """
        该指令用于统计给定的 bit 数组中,值 = 1 的 bit 位的数量。
        """
        count = await self.redis_db.bitcount(key)
        return count

    async def bitpos(self, key: Any, bit: Any, start=None, end=None) -> int:
        """
        1:返回数据表示 Bitmap 中第一个值为 bitValue 的 offset 位置。
        2:在默认情况下, 命令将检测整个位图, 用户可以通过可选的 start 参数和 end 参数指定要检测的范围。
        """
        count = await self.redis_db.bitpos(key, bit, start=start, end=end)
        return count

    # 签到功能的处理
    async def set_sign_status(self, user_id: int, _singe_key='sign_in:', day=None, statue=1) -> int:
        # 用户签到: 使用日期的来做key
        if not day:
            day = str(datetime.datetime.now())[:10]
        return await self.setbit('{}:{}'.format(_singe_key, day), user_id, statue)

    # 获取用户签到的状-当前日志用户今日签到状态,默认是当前的日期
    async def get_sign_status(self, user_id: int, _singe_key='sign_in:', day=None) -> int:
        if not day:
            day = str(datetime.datetime.now())[:10]
        return await self.getbit('{}:{}'.format(_singe_key, day), user_id)

    # 查询用户求出这个周的签到状况,和总数
    async def get_user_week_sign_status(self, user_id: int, _singe_key='sign_in:') -> tuple:
        now = datetime.datetime.now()
        # 周一是1 周日是7 now.weekday()则是周一是0,周日是6
        weekday = now.isoweekday()
        pipe = self.redis_db.pipeline()
        for d in range(weekday):
            check_day = str(now - datetime.timedelta(days=1) * d)[:10]
            pipe.getbit('{}:{}'.format(_singe_key, check_day), user_id)
        res = await pipe.execute()
        return res[::-1],sum(res[::-1])

    # 查询用户求出这个月的签到状和总数
    async def get_user_month_sign_status(self, user_id: int, _singe_key='sign_in:') -> tuple:
        now = datetime.datetime.now()
        # 周一是1 周日是7 now.weekday()则是周一是0,周日是6
        day = now.day
        pipe = self.redis_db.pipeline()
        for d in range(day):
            check_day = str(now - datetime.timedelta(days=1) * d)[:10]
            pipe.getbit('{}:{}'.format(_singe_key, check_day), user_id)
        res = await pipe.execute()
        return res[::-1],sum(res[::-1])




async_redis_client = AsyncRedisClient()
Copy the code

2.3.1 Plug-in Class initialization and Verification:

Let’s start the app with an example of the Redis asynchronous client and conduct some verification tests:

3. Summary

There are a few points to mention about the scaffolding content summary, which is almost all covered! Given the length of the article! Follow-up if possible, continue to expand!!


The above is just a personal combination of their own actual needs, do study practice notes! If there are clerical errors! Welcome criticism and correction! Thank you!

At the end

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | QQ: welcome to learn communication 】 【 308711822