First of all, we have introduced it before. OAuth2 authorizes several modes. – The specific article is here

Review of several modes of OAuth2 authorization mechanism

  • Authorization code mode
  • Implicit authorization mode
  • Password mode (this is an example from the Fastapi website)
  • Client credential mode

However, the examples on the official website mainly use the password mode to demonstrate, which may be relatively simple. Does not involve the handling of our respective codes related to similar license Code patterns.

In order to study is given priority to, this time I have to practice a own fastapi provide based OAuth2AuthorizationCodeBearer mechanism to deal with our certification.

PS: For the sake of simplicity, part of the treatment will not be too detailed. For example, I will give a detailed description of the generation and deletion of the corresponding CODE of an authorized user, mainly to understand the whole process.

In the last article we have a practical operation on the password mode, here will not continue!

See an example of password patterns

2 Authorization Code Mode Flow analysis

The process of our authorization code mode can be summarized as the following steps:

  • 1: define the address of our authorization authentication for the client to request for code parameter information. The client needs to provide necessary parameters

  • 2: On the page of the address of our authorization authentication, make relevant judgment, whether the user accepts authorization or rejects authorization

    • Agree to the authorization, then callback to the callback address provided by the client, and the callback address carries the parameters of our returned code
    • If the user does not accept authorization processing, the callback address provided by the client is called back and an error message is displayed
  • 3: If authorization is determined, access_token is also obtained at the callback page address using code information and other necessary parameters

  • 4: If we successfully obtain our corresponding access_token, we need to use this access_token to access other interfaces

  • 5: If necessary, you can also update your access_token authorization via the refresh_token interface

Ps: The above related interface, generally we can limit to get processing, if you also have other requirements can also!

3 Authorization code mode operation

3.1 Determining authorization Scheme Information

First of all, we need to consider several issues regarding the determination of authorization scheme:

1:authorizationUrl- Specifies the address to access the authorization page request (this address needs to handle both consent and disconsent authorization)

2: if the user is uniformly authorized, we will configure the address for obtaining access_token

3: What are the optional authorization domains in the authentication scheme

4:refreshUr address configuration, used to refresh access_token requests

The following objects are defined for authorization defense:

# define certification scheme oauth2_scheme = OAuth2AuthorizationCodeBearer (# authorization URL authorizationUrl = '/ oauth2 / the authorize, TokenUrl ="access_token", tokenUrl= 'refreshUrl', The scopes={"get_admin_info": "get administrator user information ", "del_admin_info": "Delete administrator user information ", "get_user_info": "get user information ", "get_user_role": "get user role information "," get_user_Permission ": "get user permission information ",})Copy the code

3.2 Defining the authorizationUrl address

Note that the host needs to specify the input of relevant necessary parameters, and the specific required parameter information can be analyzed from the parameters submitted by the document operation.

Of course, this parameter information is only suitable for re-operation document processing, if you have additional requirements, you can also customize your own required parameter information:

Generally necessary parameters are:

  • Appid A unique ID assigned to a client user
  • Redirect_uri Indicates the callback address to be provided by the client, including the callback of user code and rejected callback, that is, the address of the callback link redirected after authorization
  • Scope application permission (generally, there is no need to provide it here. The operation document mainly says what permission is expected to be granted, but in our external design, it must be selected by the client side, so this parameter is not needed)
  • Response_type Return type. The default value is code
@app. GET ("/oauth2/authorize") async def authorizationUrl(appId: Optional[STR], redirect_uri: @app. GET ("/oauth2/authorize") async def authorizationUrl(appID: Optional[STR], redirect_uri: Optional[str], scopes: str = None, response_type: Optional[str] = 'code'): # start the authentication process for this user, then produce our code user = authenticate_user(fake_users_db, appID) if not user: Raise HTTPException(status_code=400, detail=" partner APPID does not exist ") # assume that the user this place by default receives our authorization request processing !!!!!! - If it is a web page, you can still need another interface to transfer it # default - agree to authorize ----- omit the web page to display the authorized page # start returning our corresponding partner for our current characteristics to assign a different code---- # Note this place I just for demonstration, This place is definitely need to generate a different code to deal with yo! And it is with our appid one-to-one code = 'SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS' # to callback the return RedirectResponse (url = redirect_uri + "? code=" + code)Copy the code

PS: For convenience, the agree/disagree authorization page is no longer displayed here! Imagine for yourself! Ha ha

3.3 Request an access_token based on the code parameter passed in to the callback page address

This is where we need to define our access_token address, and this is where we need to issue our relevant access_token information.

The necessary parameters of this place mainly include:

  • Appid A unique ID assigned to a client user
  • Secret Indicates the secret key assigned to the client
  • Code Code parameter information extracted from the authorization callback page
  • Grant_type Default value: authorization_code

Note that we need to allocate the scope of the token to be issued. Here MY default is to write dead! (Easy to demonstrate!)

Ps2: Regarding the security issue, usually when we exchange our token, we must call the interface on the server side to avoid our secret exposure!! This is the key! After all, secret and access_token are very secure, and if you pass the access_token on the client side, you’re completely exposed !!!! Waiting for the street!! So the secret assigned is mainly bound to our appID for associated authorization processing!

@app.get("/access_token") async def access_token(appid: Optional[str], secret: Optional[str], code: Optional[str], grant_type: Optional[str] = 'authorization_code'): # start issuing our toeken and refresh_token access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) scopes = ['get_admin_info', 'del_admin_info'] # print(' The current writing function goes to scopes: Wxopenid = 'xiaozhongtongxue' access_token = create_access_token(data={"sub": wxopenid, "scopes": scopes}, expires_delta=access_token_expires, Timedelta (minutes=REFRRSH_ACCESS_TOKEN_EXPIRE_MINUTES) Refresh refresh_token refresh_token = create_access_token(data={"sub": appID, "scopes": scopes}, expires_delta=refresh_access_token_expires, ) return { "access_token": Access_token, # access_token Interface call certificate timeout period "expiRES_in ": access_token_expires, "refresh_token": refresh_token, "token_type": "bearer", "openid": wxopenid, "scope": "SCOPE" }Copy the code

3.4 Requesting an Authorization Interface using access_Token

Examples of all interfaces are as follows:

After obtaining our access_token, we can begin testing the permission issue.

The permissions we wrote last time are:

Everything else is off limits, so

For example, to access a privileged interface:

For example, to access an interface without permission:

The authorization pattern for the user name and password for the operation and signature on the process is roughly the same!

Examples of complete code:

#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
   文件名称 :     quanxianyu
   文件功能描述 :   功能描述
   创建人 :       小钟同学
-------------------------------------------------
   修改描述-    
-------------------------------------------------
"""
from datetime import datetime, timedelta
from typing import List, Optional
from fastapi import Depends, FastAPI, HTTPException, Security, status
from fastapi.security import (OAuth2PasswordRequestForm, SecurityScopes, OAuth2AuthorizationCodeBearer, OAuth2PasswordRequestFormStrict)
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel, ValidationError
from fastapi.responses import RedirectResponse

SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRRSH_ACCESS_TOKEN_EXPIRE_MINUTES = 70

oauth2_scheme = OAuth2AuthorizationCodeBearer(
    # 授权认证的URL地址
    authorizationUrl='/oauth2/authorize',
    # 配置授权的请求的是进行授权处理的接口地址
    tokenUrl="access_token",  #
    # 刷新获取新的token的地址
    refreshUrl='refreshUrl',
    # 定义我们的操作文档显示授权码授权区域----这个和下面的授权的区域关联起来,表示某个接口需要的授权域
    scopes={
        "get_admin_info": "获取管理员用户信息",
        "del_admin_info": "删除管理员用户信息",
        "get_user_info": "获取用户信息",
        "get_user_role": "获取用户所属角色信息",
        "get_user_permission": "获取用户相关的权限信息",

    }
)

# 加密方案
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")

# 用户信息表
fake_users_db = {
    "xiaozhongtongxue": {
        "username": "xiaozhongtongxue",
        "full_name": "xiaozhongtongxue",
        "email": "[email protected]",
        "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW",
        "disabled": False
    }
}


class TokenData(BaseModel):
    username: Optional[str] = None
    scopes: List[str] = []


class User(BaseModel):
    username: str
    email: Optional[str] = None
    full_name: Optional[str] = None
    disabled: Optional[bool] = None


class UserInDB(User):
    hashed_password: str


# 定义我们的APP服务对象
app = FastAPI()


# 获取加盐密码
def get_password_hash(password):
    return pwd_context.hash(password)


# 进行用户认证用户的认证
def authenticate_user(fake_db, username: str):
    user = get_user(fake_db, username)
    if not user:
        return False
    return user


# 从上面定义的字典表里查询用户信息,并返回用户信息实体
def get_user(db, username: str):
    if username in db:
        user_dict = db[username]
        return UserInDB(**user_dict)


# 创建我们的授权之后,给用户签发的TOKEN
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    '''
    签发token
    :param data: data里面包含用户信息和签发授权的作用域信息
    :param expires_delta:
    :return:
    '''
    to_encode = data.copy()
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(minutes=15)
    to_encode.update({"exp": expire})
    encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
    return encoded_jwt


async def get_current_user(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)):
    print("当前认证方案里面的作用域:", security_scopes.scope_str)
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = f"Bearer"

    # 定义认证异常信息
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    print("当前携带上来的token值:", token)
    try:
        # 开始反向解析我们的TOKEN.,解析相关的信息
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        wxopenid: str = payload.get("sub")
        if wxopenid is None:
            raise credentials_exception

        print("当前用户名称userid:", wxopenid)
        token_scopes = payload.get("scopes", [])
        #
        print("当前用户所属的toekn信息里面包含的scopes信息有:", token_scopes)
        token_data = TokenData(scopes=token_scopes, username=wxopenid)
        print("token_data", token_data)
    except (JWTError, ValidationError):
        raise credentials_exception

    # 再一次从数据库里面验证用户信息
    user = get_user(fake_users_db, username=token_data.username)
    if user is None:
        raise credentials_exception

    # 然后
    print("当前认证方案里面所有security_scopes信息有:", security_scopes.scopes)
    for scope in security_scopes.scopes:
        # 对比用户的token锁携带的用户的作用区域授权信息
        if scope not in token_data.scopes:
            # 如果不存在则返回没有权限异常信息
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Not enough permissions",
                headers={"WWW-Authenticate": authenticate_value},
            )
    return user


# 注意接口的也可以定义相关的权限的依赖!!!!或者组合的,比如我这里要是定义其他的话,依赖这个的接口,如果没有这个权限也无法访问!
async def get_current_active_user(current_user: User = Security(get_current_user, scopes=["get_admin_info"])):
    print("输出当前用户:", current_user)
    # 判断用书是否已经被禁用了!!!如果没有则继续执行
    if current_user.disabled:
        raise HTTPException(status_code=400, detail="Inactive user")
    return current_user


@app.get("/access_token")
async def access_token(appid: Optional[str], secret: Optional[str], code: Optional[str], grant_type: Optional[str] = 'authorization_code'):
    # 开始签发我们的toeken和refresh_token
    access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    scopes = ['get_admin_info', 'del_admin_info']
    # print('当前写入的作用去scopes:', scopes)

    # 验证code和appid 信息
    if code == 'SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS' and appid == 'xiaozhongtongxue' and secret == 'xiaozhongtongxue':
        pass
        # 这里仅仅作为演示,相关的处理逻辑需要结合业务来进行处理

    # 因为需要对用户的信息再核查这里直接传这个
    username = 'xiaozhongtongxue'

    access_token = create_access_token(
        data={"sub": username, "scopes": scopes},
        expires_delta=access_token_expires,
    )
    # 刷新refresh_token过期的时间
    refresh_access_token_expires = timedelta(minutes=REFRRSH_ACCESS_TOKEN_EXPIRE_MINUTES)
    # 刷新refresh_token
    refresh_token = create_access_token(
        data={"sub": appid, "scopes": scopes},
        expires_delta=refresh_access_token_expires,
    )

    return {
        "access_token": access_token,
        # access_token接口调用凭证超时时间
        "expires_in": access_token_expires,
        "refresh_token": refresh_token,
        "token_type": "bearer",
        "userid": username,
        "scope": "SCOPE"
    }


# 必须使用GET的请求才可以
@app.get("/oauth2/authorize")
async def authorizationUrl(appid: Optional[str], redirect_uri: Optional[str], scopes: str = None, response_type: Optional[str] = 'code'):
    # 开始对这个用户对应的信息验证处理,然后生产我们的code
    user = authenticate_user(fake_users_db, appid)
    if not user:
        raise HTTPException(status_code=400, detail="合作方APPID不存在")

    # 假设用户这地方默认接收我们的授权请求的处理!!!!!!--如果是网页的,可以还是需要其他的接口进行转接一下

    # 默认---同意授权-----省略网页显示授权页面

    # 开始返回我们的对应的针对于我们的当前特点的合作方分配不同的code----
    # 注意这个地方我只是为了演示,这个地方肯定是需要生成不一样的code来处理的哟!而且是和我们的appid一一对应的
    code = 'SSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSSS'
    # 开始进行回调
    return RedirectResponse(url=redirect_uri + "?code=" + code)


@app.get("/status/")
async def read_system_status():
    return {"status": "ok"}


@app.get("/api/v1/get_admin_info", response_model=User)
async def get_admin_info(current_user: User = Security(get_current_active_user, scopes=["get_admin_info"])):
    return current_user


@app.get("/api/v1/del_admin_info", response_model=User)
async def del_admin_info(current_user: User = Security(get_current_active_user, scopes=["del_admin_info"])):
    return current_user


@app.get("/api/v1/get_user_info", response_model=User)
async def get_user_info(current_user: User = Security(get_current_active_user, scopes=["get_user_info"])):
    return current_user


@app.get("/api/v1/get_user_role", response_model=User)
async def get_user_role(current_user: User = Security(get_current_active_user, scopes=["get_user_role"])):
    return current_user


@app.get("/api/v1/get_user_permission", response_model=User)
async def get_user_role(current_user: User = Security(get_current_active_user, scopes=["get_user_permission"])):
    return current_user


import uvicorn

if __name__ == '__main__':
    # 等于通过 uvicorn 命令行 uvicorn 脚本名:app对象 启动服务:uvicorn xxx:app --reload
    uvicorn.run('quanxianyu-code:app', host="127.0.0.1", port=8000, debug=True, reload=True)

Copy the code

At the end

Simple notes! For reference only!

END

Jane: www.jianshu.com/u/d6960089b…

The Denver nuggets: juejin. Cn/user / 296393…

Public account: wechat search [children to a pot of wolfberry wine tea]

Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822