In section (2) of the previous chapter, some basic security mechanisms on the official website were supplemented, but when it came to the subsequent bonus JWT certification, I felt that novices would definitely be confused and confused. Including my first time to see the time, do not know what is to say, just hard scalp to chew the document! Ha ha
But after oneself masturbate once, also feel nothing but that thing! Nothing!
Here in order to avoid scattered topics, I here simply also add 2 of those to here! Compare and modify 2 again, seems to need review again!! Ha ha afraid of being audited by the big guy to call ④~~~
PS: Part and (2) of the content of the repetition, forgive me! I hope you can forgive me!
1 Fastapi Security Authorization (added about Oauth2 about basics)
May be relatively novice, the oAUTH2 authorization may still be a little confused, confused not afraid, afraid is that you do not understand, do not go to practice and experience. When you don’t understand, you should learn to search and think. I think no matter how difficult a problem is, there will be new discoveries and new ideas.
I suggest that you can directly baidu, others estimate that the description is more professional. My description here may be more lame!
1.1. What is OAuth2?
. What is OAuth2
The professional way to say it is:
OAuth (Open Authorization) is an open standard that allows users to authorize third-party mobile applications to access their information stored on another service provider without having to provide user names and passwords to third-party mobile applications or share all the content of their data.
1.2. Application scenarios of OAuth2?
You can see wechat login authorization, QQ login authorization and Alipay account login authorization everywhere. Pretty much!
1.3.OAuth2 authorizes several modes.
- Authorization code mode
- Implicit authorization mode
- Password mode (this is an example from the Fastapi website)
- Client credential mode
For the above several modes, how to distinguish?
- Authorization code mode
First of all, the authorization code mode is actually my common and most commonly used. The current use of wechat authorization to log in to third-party applications is this authorization mode. It is a relatively complete and strict authorization mode, which requires Code and relevant APP_secret to exchange our token.
- Obtain the token by code
- To support the refresh token
The best testimony about this mode is the documents on our official wechat authorization website.
WeChat authorization: developers.weixin.qq.com/doc/oplatfo…
Sequence diagram of wechat authorization:
The following text was copied from wechat’s official document:
Wechat OAuth2.0 authorization login currently supports authorization_code mode, which is suitable for application authorization with server side. The overall process of this mode is as follows: 1. A third party initiates a wechat authorized login request, and after a wechat user authorizes a third-party application, wechat will pull up the application or redirect it to a third-party website with code parameters of the authorized temporary note. 2. Exchange the access_token through API by adding AppID and AppSecret to the code parameter. 3. Invoke the access_token interface to obtain basic data resources or help users to perform basic operations.Copy the code
Interested or read the wechat authorization instructions! It feels like after you read it, you actually understand that you can do this kind of licensing model.
- Implicit authorization
Compared with the weight code mode, the callback URL directly carries the token. This mode is used in browser-based applications. The security of this mode is not very high, and tokens are easily intercepted. Refresh Token is not supported.
- Password mode
Really the official website is this mode, and the user name and password and in our operation document that is still clear text transfer, a little pit ah! While this mode supports Refresh Token, it is certainly not recommended for security purposes! Only the example of the official website is so, presumably for convenience!
- Client credential mode
This pattern allows tokens to be obtained directly based on client_ID and key without user involvement, and is suitable for back-end services that consume the type of API.
PS: The main point about refresh tokens is that we can use it to exchange for new tokens if our tokens have expired or expired and are within the normal range.
2 FastAPI OAuth2- Password authorization mode
As you can see from the documentation on the official website, their approach to security is to continue to explain the security issues after the discussion of dependencies. After a careful analysis of the official website example, its implementation does indeed rely on the dependency injection implementation we discussed in the previous section.
If you are not familiar with dependency injection, it is recommended to refer to the previous content, of course, you can also go directly to the official website to look!
PS: Because our OAuth2 username and password are submitted in the form, we need to install the library that supports the form:
pip install python-multipart
Copy the code
2.2 obtaining tokens through OAuth2PasswordBearer
import uvicorn from fastapi import FastAPI,Depends from fastapi.security import OAuth2PasswordBearer app = FastAPI() # Define a dependency injection object -- this dependency injection object returns a token # tokenUrl="token" which is the address to request the token, Format is /token oAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token") @app.get("/read_token/") async def read_token(token: str = Depends(oauth2_scheme)): return {"token": token} if __name__ == "__main__": Uvicorn. Run (' main: app, host = "127.0.0.1", the port = 8000, reload = True)Copy the code
Execute document interface operation (prompt no authentication) :
2.3 Shiny “License” button click and enter
The official translation is a shiny button !!!! I don’t think it’s shiny enough haha!
According to the official website, it is true that we input nothing drops!!
The OAuth2 header does not contain the information required by the OAuth2 header.
Authorization: Bearer of XXXXXX request headers
2.4 Use POSTMAN to submit request headers
No input:
Case with input:
Note: The interface returns a Token
As can be seen from the above interface, our Authorization: Bearer has been followed by the values of our tokens.
So the above demonstration request, in fact, is to show that if our interface does not pass the authentication information in the request header according to the standard protocol, it will not pass the drop! Our OAuth2PasswordBearer has its own validation of user input information.
2.5 TOKEN Processing Example
import uvicorn from fastapi import FastAPI, Security import OAuth2PasswordBearer app = fastAPI () def fake_decode_token TOEKEN return "token I am processed %s "+ token # Define a layer 1 dependency -- # define a dependency injection object -- this dependency injection object returns a token oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") # define the second layer of dependencies # define a dependency to handle our front-end incoming Authorization: Bearer value messages async def get_current_user(token: STR = Depends(oauth2_scheme)): user = fake_decode_token(token) return user @app.get("/read_token/") async def read_token(token: str = Depends(get_current_user)): return {"token": token} if __name__ == "__main__": Uvicorn. Run (' main: app, host = "127.0.0.1", the port = 8000, reload = True)Copy the code
The dependency flow above:
-
Get_current_user, depending on oauth2_scheme
-
Oauth2_scheme mainly receives tokens
-
Get_current_user: after receiving a token dependent on oauth2_scheme, call fake_decode_token for processing
Re-access interface:
The example on the official website is to define a model for injection in fake_decode_token, I didn’t follow it!
class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None def fake_decode_token(token): return User( username=token + "fakedecoded", email="[email protected]", full_name="John Doe" )Copy the code
But it’s the same principle! It’s just an injection of the user’s information and back!
2.6 Complete Token issuance and verification process based on OAuth2:
After searching for so long, I unexpectedly did not see the complete login process and the issuance of token process! Know after sleep before you know the original! The example on the official website is quite strange! How to obtain the Token and then talk about what signed! So the following is the process of issuing tokens based on the actual login username and password.
So a complete and brief example is as follows:
2.6.1 Example code:
import secrets import uvicorn from fastapi import FastAPI, Depends from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from pydantic import BaseModel from typing import Optional app = FastAPI() oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token") class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def fake_decode_token(token): # This doesn't provide any security at all # Check the next version user = get_user(fake_users_db, token) return user async def get_current_user(token: str = Depends(oauth2_scheme)): user = fake_decode_token(token) if not user: Raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="TOKEN authentication failed! TOKEN illegal!" , headers={"WWW-Authenticate": "Bearer"}, ) return user async def get_current_active_user(current_user: User = Depends(get_current_user)): if current_user. Disabled: raise HTTPException(status_code=400, detail=" The current User is unavailable!" Fake_users_db = {"xiaozhongtongxue": {"username": "xiaozhongtongxue", "full_name": "John Doe", "email": "[email protected]", "hashed_password": "xiaozhongtongxue=xiaozhongtongxue", "disabled": False, }, "xiaozhongtongxue2": { "username": "alice", "full_name": "Alice Wonderson", "email": "[email protected]", "hashed_password": "xiaozhongtongxue", "disabled": Def fake_hash_password(password: STR): return "xiaozhongtongxue=" + password @app.post("/oauth/token") async def login(form_data: OAuth2PasswordRequestForm = Depends()): User_dict = fake_users_db.get(form_data.username) print(" dict ", user_dict) if not user_dict: Raise HTTPException(status_code=400, detail=" This user information does not exist. ) user = UserInDB(**user_dict) print(" user output password: ", form_data.password) print(" user output password:" ", user.hashed_password) hashed_password = fake_hash_password(form_data.password) print("hashed_password ", hashed_password) if not hashed_password == user.hashed_password: Raise HTTPException(status_code=400, detail=" The user password does not match the one generated by fake_hash_password!" ) return {"access_token": user.username, "token_type": get_current_active_user@app. get("/users/me") async def read_users_me(current_user: User = Depends(get_current_active_user)): # return current_user if __name__ == "__main__": Uvicorn. Run (' main: app, host = "127.0.0.1", the port = 8000, reload = True)Copy the code
2.6.1 The following points should be paid attention to in obtaining the token:
You must submit using a form: JSON submission is useless:
Form submission:Documentation also provides submission only in form format
2.6.2 Normal Token Issuing Request:
2.6.3 Verification of Issued Token:
Incorrect TOKEN:
Correct TOKEN:
Login using action document:
The default address for finding it is:
/token HTTP/1.1" 404 Not Found
Copy the code
So add a multi-address support, then enter the same username and password after
Then verify our token-carrying interface:
At this point, exit the authentication document of the following operations to see: Manually release the Token to invalidate
Then access the interface that needs to carry tokens:
The whole process is preliminarily completed!
OAuth2 using (hash) passwords and JWT Bearer tokens
As for what JWT is here, Baidu bar! What all have! I will not repeat so much here!
Here are some additional features about JWT
- Small size, and therefore fast transmission
- The payload can contain basic authentication messages, but it is not recommended to write passwords.
- The server does not need to connect to the database to verify the validity of the information, and the payload can be customized for your application.
- Support for cross-domain authentication, which can often be applied to single sign-on requirements.
How to use JWT in flask:
Flask + Ant Vue (10) -JWT Authentication Middleware
3.1 Using the Pre-Environment
The website says: now that we have all the security processes in place, let’s make the application truly secure using JWT tokens and secure hashes!! It means that we add more ingredients based on the above!
Look at the official website of the example, in fact, if the analysis is to follow its dependence on the idea of words, or you can follow the story, reason out some clue out. However, I feel that the official website is relatively detailed.
According to my previous use of JWT, it is directly encapsulated into a class, one for generating our JTW and one for validating JWT. Since is the official website recommended example, the understanding or to understand first.
1. Install the JWT library recommended by the official website
PIP install cryptography -- Jose Install cryptography -- Jose install cryptographyCopy the code
Python-jose, because it provides all the functionality of PyJWT and some other functionality you might need later when integrating with other tools.
2. Install the password salted dependency library on the official website
PIP install passlib[bcrypt] PIP install bcryptCopy the code
Hashing passwords:
“Hash” means: to convert something (in this case, a password) into a sequence of bytes (just a word > string) that looks like gibberish.
Every time you pass in exactly the same content (exactly the same password), you get exactly the same garbled code.
But you can’t convert from garble back to password.
Passlib supports a number of secure hashing algorithms and utilities that work with them. The algorithm recommended on the website is Bcrypt.
3.2 JWT official website sample Guagua Shunteng series
The website says the steps are:
3.2.1 Create a utility function to hash the password from the user
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def get_password_hash(password):
return pwd_context.hash(password)
Copy the code
3.2.2 Create another utility function to verify that the received password matches the stored hash
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
Copy the code
Hashing a password
from passlib.context import CryptContext pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def get_password_hash(password): return pwd_context.hash(password) def verify_password(plain_password, hashed_password): Return pwd_context.verify(plain_password, hashed_password) print ",get_password_hash('xiaozhong')) print(" ",get_password_hash('xiaozhong') print(" plain text password and hash password authentication --------- ") print(" ",verify_password('xiaozhong',get_password_hash('xiaozhong')) print(" ",verify_password('xiaozhong',get_password_hash('xiaozhong'))) output from the example above: For salt after password 1: $2 b $12 to $VuU. TAakvuYnQM/smhU0 / efASK5E/yMakuD89aVdkUn0CVHMiMpd2 for salt after password 2: $2 b $12 to $saNNZ9Mp6qt / 7 jm2qyqwmeq3vstu. TB1 XPdNe3M/qwm3EcdcLvlK cleartext passwords and validation of the hashed password after -- -- -- -- -- -- -- -- -- "get the password after salt 1: True for salt after password 2: TrueCopy the code
:PS from above see our one inscription can be for multiple different hash values!
3.2.3 Create a utility function to authenticate and return the user
def authenticate_user(fake_db, username: str, password: str):
user = get_user(fake_db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
Copy the code
The above tool functions are mainly used to verify the password of the current user after the user enters the user name and password.
3.2.4 Complete example explanation of the official website:
from datetime import datetime, timedelta from typing import Optional from fastapi import Depends, FastAPI, HTTPException, status from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm from jose import JWTError, jwt from passlib.context import CryptContext from pydantic import BaseModel # to get a string like this run: 32 # # openssl rand - hex key SECRET_KEY JWT's signature = "09 d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7 #" ACCESS_TOKEN_EXPIRE_MINUTES = 30 FAke_users_db = {" Johndoe ": {"username": "johndoe", "full_name": "John Doe", "email": "[email protected]", "hashed_password": "$2b$12$EixZaYVK1fsbw1ZfbX3OXePaWxn96p36WQoeG6Lruj3vjPGga31lW", "disabled": False, } } class TokenData(BaseModel): username: Optional[str] = None class User(BaseModel): username: str email: Optional[str] = None full_name: Optional[str] = None disabled: Optional[bool] = None class UserInDB(User): hashed_password: str pwd_context = CryptContext(schemes=["bcrypt"], Deprecated ="auto") oAuth2_scheme = OAuth2PasswordBearer(tokenUrl="token") app = FastAPI() # verify_password(plain_password, hashed_password): Return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password): return pwd_context.hash(password) def get_user(db, username: str): if username in db: user_dict = db[username] return UserInDB(**user_dict) def authenticate_user(fake_db, username: str, password: str): Username = get_user(fake_db, username); username = get_user(fake_db, username); If not verify_password(password, user.hashed_password): if not verify_password(password, user.hashed_password): Return user def create_access_token(data: dict, expires_delta: Optional[timedelta] = None): To_encode = data.copy() # expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=15) to_encode.update({"exp": Expire}) # generate encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt async def get_current_user(token: str = Depends(oauth2_scheme)): # CredEntialS_Exception = HTTPException(status_code= status.http_401_Unauthorized, detail="Could not validate credentials", headers={"WWW-Authenticate": "Bearer"}, ) try: Decode (algorithms=[ALGORITHM], hashtag =[ALGORITHM], hashtag =[ALGORITHM], hashtag =[ALGORITHM]) str = payload.get("sub") if username is None: Raise credEntialS_exception raise credentialS_exception token_data = TokenData(username=username) except JWTError Raise credentialS_exception = get_user(fake_users_db, username=token_data.username) if user is None: # raise credEntialS_exception # Return user async def get_current_ACTIVE_user (current_user: User = Depends(get_current_user)): if current_user.disabled: raise HTTPException(status_code=400, Detail ="Inactive user") return current_user # Define the output model of the interface's returned Token class Token(BaseModel): access_token: STR token_type: str @app.post("/token", response_model=Token) async def login_for_access_token(form_data: OAuth2PasswordRequestForm = Depends ()) : "' login access to user authorization, our token certificate issued by the entrance API: param form_data: : return: Step 1: first is the check after OAuth2PasswordRequestForm dependency injection Step 2: after check, we start to our input user name and password from fake_users_db query and check Step 3: Verify user input information by authenticate_user - Check whether the name of the entered user exists from the current user dictionary, if so, return the entity of the user, - If there is no user information, the authentication fails - If there is user information, Start using verify_password to verify the user password and the salted password, and return the user information when the verification is successful. Step 4: After returning the validation information, we start generating our JWT - generating the expiration time of the JWT access_token_expires, User = AUTHENTICate_user (fake_users_db, form_data.username, form_data.password) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", headers={"WWW-Authenticate": "Bearer"},) # define JWT expiration date access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) # write JWT user information Access_token = create_access_token(data={"sub": user.username}, expires_delta=access_token_expires ) return {"access_token": access_token, "token_type": "bearer"} @app.get("/users/me/", response_model=User) async def read_users_me(current_user: User = Depends(get_current_active_user)): Get ("/users/me/items/") async def read_own_items(current_user: User = Depends(get_current_active_user)): return [{"item_id": "Foo", "owner": current_user.username}] import uvicorn if __name__ == '__main__': # equal to the uvicorn command line uvicorn script name :app object start service: Uvicorn XXX :app --reload uvicorn.run('frun2:app', host="127.0.0.1", port=8000, debug=True, reload=True)Copy the code
Access documentation:
The process is actually the same as above, after login successfully, it means that my TOKEN is issued successfully, then, the API interface that needs TOKEN will automatically wear this TOKEN drop!
If user name and password:
Check the interface
Request interface:
3.2.5 summary
This example feels a bit convoluted, but it’s actually a bird like I used to use PYjwt! In my usual way of using Flask, I use it as a middleware package, as shown in this example:
Mport JWT import Datetime class Auth(): # signature secret = 'super-man$&123das% QZQ '# iss: @classmethod def create_token_by_data(CLS, sub= ", Data ={}, scopes=['open'], exp_time=60 * 60 * 24 * 5): "" generate the corresponding JWT token value: Param secret: indicates whether a null check is required. True indicates whether a null check is required. Param exp_time: indicates the token expiration time, calculated in seconds. Return False, {'token': '', 'Meg ':' void '} payload = {"iss": cls.iss, # iss: JWT signer. "Exp ": datetime.datetime.utcnow() + datetime.timedelta(days=0, seconds=exp_time), "Iat ": datetime.datetime.utcnow(), # when issued (UNIX time), whether used is optional; "Aud ": cls.aud, # the party that receives the JWT, whether to use it is optional; "Sub ": sub, # sub: the user for which the JWT is used is optional. "Scopes ": scopes, # user authorized scopes, use commas (,) to separate "data": data} # do not participate in the signature calculation if not sub: Payload. Pop ('sub') # token = jwt.encode(payload, cls.secret, algorithm='HS256') # token return True, str(token, 'utf-8') @classmethod def verify_bearer_token(cls, ischecck_sub=False, sub_in='', token=''): # if aud is used in token generation, try will be used in token verification. payload = jwt.decode(token, cls.secret, audience=cls.aud, algorithms=['HS256']) if ischecck_sub and sub_in ! = '': sub = payload['sub'] if sub ! = sub_in: return False, "invalid Token" if payload and ('data' in payload): # validation are returned to the corresponding part of the signature field information return True, content [' data '] else: raise JWT. InvalidTokenError except JWT. ExpiredSignatureError: Return False, "Token expired "except JWT.InvalidTokenError: return False, "Token invalid" except: Return False, "invalid Token" @classmethod def verify_bearer_token_state(CLS, ischecck_sub=False,sub_in=", Token = "): # if aud is used in token generation, try will be used in token verification. payload = jwt.decode(token, cls.secret, audience=cls.aud, algorithms=['HS256']) if ischecck_sub and sub_in ! = '': sub = payload['sub'] if sub ! = sub_in: return False, 1, "invalid Token" if payload and ('data' in payload): Return True, 0, payload['data'] else: Raise JWT. InvalidTokenError except JWT. ExpiredSignatureError: return False, 2, "the Token expired" is never dull JWT. InvalidTokenError: Except: return False, 1, "invalid Token"Copy the code
END
Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822