The need for caching is an essential part of an application. For caching, most of our libraries are mainly reids. However, for client-side asynchronous support libraries, there are fewer than at present:
1 Redis asynchronous client support library:
The main libraries supporting asynchronous clients are:
- aioredis
- asyncio_redis
- Aredis (Seems to feel good too! More friendly and easier to use than Aioredis!
This article is mainly to learn about Aioredis, see the official website says that 2.00 has been released and all the changes! The most wanted version is also 1.3.1 stuck in that 19 years!
2: Simple use of aioredis
1: installation
pip install aioredis
Copy the code
The official recommendation also requires the installation of:
pip install hiredis
Copy the code
Hiredis are basically accelerators!
The above operation installs both aioredis and Async-Timeout libraries.
2: Simplest example
2.1 Modified examples provided on the official website:
import asyncio
import aioredis
async def main():
redis = aioredis.from_url("redis://localhost")
await redis.set("my-key", "value")
value = await redis.get("my-key")
print(value)
asyncio.run(main())
Copy the code
It is a pity! The above example does not run! Thought the from_URL had been removed!
So the way to create the method is changed to the following structure can be seen in several methods:
Example: Change from_URL to create_redis
import asyncio import aioredis async def main(): Redis = await aioredis. Create_redis ('redis://127.0.0.1:6379/0') await redis. Set ("my-key", "value") value = await redis.get("my-key") print(value) asyncio.run(main())Copy the code
The output of the above example results in:
b'value'
Copy the code
The result above is of type bytes! This is a little bit out of place! Change it again, return the type of string directly!
Import asyncio import aioredis async def main(): redis = await aioredis. Create_redis ('redis://127.0.0.1:6379/0? encoding=utf-8') await redis.set("my-key", "value") value = await redis.get("my-key") print(value) asyncio.run(main())Copy the code
The result above is a string like! The main difference is the way to add a code!
2.2 Other Methods of Creating a Client
-
redis_cli = await aioredis.create_connection((‘localhost’, 6379), loop=loop)
-
redis_cli= await aioredis.create_pool((‘localhost’, 6379), minsize=3, maxsize=20, loop=loop)
-
redis_cli = await aioredis.create_redis( (“localhost”, 6379), loop=loop)
-
Redis_cli = await aioredis. Create_redis (address, password=password)
-
Redis_cli = await aioredis.create_redis(address) (no password needed)
-
Redis_cli = create_redis_pool (f “redis: / / : [email protected]:6379/0? encoding=utf-8”)
3: Hash operation: hmset_dic
- Hash operations: hmset_dict
- Pipeline operation: pipeline
Examples from the official website:
#! /usr/bin/evn python # -*- coding: utf-8 -*- import asyncio import aioredis async def main(): Redis = await aioredis. Create_redis (' redis: / / 127.0.0.1:6379/0? encoding=utf-8') await redis.set("my-key", "Value ") value = await redis.get("my-key") print(value) print("HASH dictionary operation ") await redis.hmset_dict(" HASH ", key1="value1", key2="value2", key3=123) result = await redis.hgetall("hash", Encoding =" UTF-8 ") # assert Result == {" KEY1 ": "value1", "KEY2 ": "value2", "key3": "123", # note that Redis returns int as string } print(result) asyncio.run(main())Copy the code
Output result:
Value HASH dictionary operation {'key1': 'value1', 'key2': 'value2', 'key3': '123'}Copy the code
The result of the dictionary in Redis is:
4: Queue operations related:
import asyncio import async_timeout import aioredis async def main(): Pass redis = await aioredis. Create_redis ('redis://127.0.0.1:6379/0? Sd = = utf-8 'encoding) await redis. Rpush (' one: 1', 'hello, 222') print (" = = = = = = = = = = = = = = = 1 ", sd) sd = await redis. Lpush (' one: 2 ', 'said the ecliptic black) print (" = = = = = = = = = = = = = = = 2 ", sd) asyncio. Run (the main ())Copy the code
Final result:
5. Examples of pipeline operations:
Example from the official website ———— Unfortunately, this example will not work! Lame! And very strange even abnormal information and circumstances will not prompt!
import asyncio import async_timeout import aioredis from aioredis import Channel from typing import Tuple async def Main (): pass redis = await aioredis. Create_redis ('redis://127.0.0.1:6379/0? encoding=utf-8') async def get_with_ttl( key: str) -> Tuple[int, str]: p = redis.pipeline() p.ttl(key) p.get(key) return await p.execute() print('1111111111111') sd = await get_with_ttl('my-key2') print('2222222222') print(sd) asyncio.run(main())Copy the code
Link-like request not supported:
The pipe object does not support this link type operation and must be split! P.et (“foo”).incr(“bar”) And there will be no feedback of any situation!
The sample provided by the official website does not run! :
6: Publish and subscribe
6.1 Subscribe to specific channels for consumption
Subscribe to the end:
import asyncio import async_timeout import aioredis from aioredis import Channel async def main(): Pass redis = await aioredis. Create_redis ('redis://127.0.0.1:6379/0? Encoding = UTF-8 ') # print("1") ps = await redis.pubsub_channels() print("2",ps) channels = await redis.subscribe(' receive ') # Channels = await. Subscribe (' receive ') async def reader(channel: channel) -> None: message2 = await channel.wait_message() print(message2) sasd =await channel.get() print(sasd.decode('utf-8')) for channel in channels: print('sssssssssss', Channel) sada = await reader(channel) # ps.subscribe(' receive goods ') # loop = asyncio.get_event_loop() # loop.run_until_complete(main()) # loop.run_forever() asyncio.run(main())Copy the code
Publish end:
import asyncio import async_timeout import aioredis from aioredis import Channel async def main(): Pass redis = await aioredis. Create_redis ('redis://127.0.0.1:6379/0? Sd = await redis.publish(' receive ', 'receive ') asyncio.run(main())Copy the code
Start our subscriber first, and then start our publisher. At this point, the subscriber can receive our data.
3: Cache processing with FastAPI
3.1 Easy way to introduce fastAPI
Usually when we use it in the framework, we pool it into the import.
The specific introduction steps can be
3.1.1 Creating an object for redis_pool
Async def get_redis_pool() -> Redis: Redis = await create_redis_pool(f" Redis: //:[email protected]:6379/0? encoding=utf-8") return redisCopy the code
3.1.2 In the context of the APP object registered to Fastapi
The way of registration is generally through the way of event listening, registration is carried out when the restart, listening to the shutdown of the service, the release of the link.
@app.on_event('startup') async def startup_event(): """ app.state.redis = await get_redis_pool() @app.on_event('shutdown') async def shutdown_event(): """ Close :return: """ app.state.redis.close() await app.state.redis.wait_closed()Copy the code
The above method of registering events can also be used as follows:
app.add_event_handler("startup", create_redis_conn_handler(app))
app.add_event_handler("shutdown", create_redis_disconnect_handler(app))
Copy the code
And then I’m going to correlate it with the corresponding function.
def create_redis_conn_handler(app: FastAPI):
async def start_app():
pool = await create_redis_pool("redis://:6379")
app.state.redis_conn = RedisConn(pool)
return start_app
def create_redis_disconnect_handler(app: FastAPI):
async def stop_app():
await app.state.redis_conn.disconnect()
return stop_app
Copy the code
3.1.3 Method of use
Since our FastAPI has the corresponding Request context containing our APP object, we can obtain our current APP object in the Request context object, and then obtain the relevant instance for operation.
Await the request. The app. State. Redis. Set (" nihao ", num) v = # redis read await request. The app. State. Redis. Get (" nihao ")Copy the code
4: Use the open source FastapI-cache library
Flask_cacheing – this library supports the related use of flask_cacheing decorators, etc., in flask, which is nice for those who need it!
The first step is to install the dependency library: Fastapi-cache asynchronism is dependent on aioredis, so using it requires our aioredis installed as well. It also supports memcache as a cache, if you need to use me.
pip install fastapi-cache2
Copy the code
4.1 Fastapi-Cache official website Example
Here I’ll just post the sample code from its official website:
import aioredis
from fastapi import FastAPI
from starlette.requests import Request
from starlette.responses import Response
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
app = FastAPI()
@cache()
async def get_cache():
return 1
@app.get("/")
@cache(expire=60)
async def index(request: Request, response: Response):
return dict(hello="world")
@app.on_event("startup")
async def startup():
redis = await aioredis.create_redis_pool("redis://localhost", encoding="utf8")
FastAPICache.init(RedisBackend(redis), prefix="fastapi-cache")
Copy the code
If you need to customize the key in our decorator, you can also do the following:
def my_key_builder( func, namespace: Optional[str] = "", request: Request = None, response: Response = None, *args, **kwargs, ): prefix = FastAPICache.get_prefix() cache_key = f"{prefix}:{namespace}:{func.__module__}:{func.__name__}:{args}:{kwargs}" return cache_key @app.get("/") @cache(expire=60,coder=JsonCoder,key_builder=my_key_builder) async def index(request: Request, response: Response): return dict(hello="world")Copy the code
4.2 FastapI-cache source code analysis
The analysis of the source code is mainly to go deep inside to understand how to add the corresponding decorator to our asynchronous coroutine functions. Find out through the source code!
4.2.1 FastAPICache Analysis
The first is our FastAPICache, which handles the management and setting of the corresponding cache objects.
We can specify the following information for instantiation from init:
@classmethod
def init(
cls,
backend,
prefix: str = "",
expire: int = None,
coder: Coder = JsonCoder,
key_builder: Callable = default_key_builder,
):
Copy the code
- Backend: Specifies whether the cache used is a Redis or memcache object
- Prefix: cache key prefix
- Expire: Global expiration time Settings
- Coder: Specifies how to package and unpack related data
- Key_builder: specifies a custom key method, which must be a callable object! The default way to generate keys is in key_builder.py.
4.2.2 Key_Builder Key generation algorithm for cache fields
Specific source code:
import hashlib
from typing import Optional
from starlette.requests import Request
from starlette.responses import Response
def default_key_builder(
func,
namespace: Optional[str] = "",
request: Optional[Request] = None,
response: Optional[Response] = None,
args: Optional[tuple] = None,
kwargs: Optional[dict] = None,
):
from fastapi_cache import FastAPICache
prefix = f"{FastAPICache.get_prefix()}:{namespace}:"
cache_key = (
prefix
+ hashlib.md5( # nosec:B303
f"{func.__module__}:{func.__name__}:{args}:{kwargs}"
).hexdigest()
)
return cache_key
Copy the code
Analysis of the source code can be reached to the conclusion is: mainly depends on our current: prefix + function module + function name and passed parameter information = key
4.2.3 Cache Decorator
from functools import wraps from typing import Callable, Optional, Type from fastapi_cache import FastAPICache from fastapi_cache.coder import Coder def cache( expire: int = None, coder: Type[Coder] = None, key_builder: Callable = None, namespace: Optional[str] = "", ): """ cache all function :param namespace: :param expire: :param coder: :param key_builder: :return: """ def wrapper(func): @wraps(func) async def inner(*args, **kwargs): Nonlocal specifies three variables used for nested functions. Nonlocal coder nonlocal expire nonlocal key_builder Copy_kwargs = kwargs.copy() # Extract the current request and response and pop request = copy_kwargs.pop("request", None) response = copy_kwargs.pop("response", None) # If request and request.headers (" cache-control ") == "no-store": if request and request.headers (" cache-control ") == "no-store" Return await func(*args, **kwargs) # return await func(*args, **kwargs) # Expire = expire or fastapicache.get_expire () # specifies whether a custom key is generated. Key_builder = key_builder or fastapicache.get_key_builder () # get redis or memcache backend = Cache_key = key_builder(func, namespace, request=request, Response =response, args=args, kwargs=copy_kwargs) Ret = await backend. Get_with_ttl (cache_key) # If not request: # If not request: # if not request: Ret = await func(*args, **kwargs) # after the result of the ret request is returned, Set (cache_key, coder.encode(ret), Expire or fastapicache.get_expire ()) # return ret # if there is a request object, if there is not a get method, just continue to run! if request.method ! = "GET": Return await func(request, *args, **kwargs) If_none_match = request.headers. Get ("if-none-match") # If our cache result object is not empty if ret is not none: If the body of our response exists: Response. headers[" cache-control "] = f"max-age={TTL}" # And when you make the same request a second time, the client sends an if-none-match, and its value is the value of Etag (set here by the requesting client). If yes, set if-none-match to false and return status 304. The client continues to use the local cache and does not parse the data returned by the server. If not, set if-none-match to true and return status 200. Etag = f"W/{hash(ret)}" etag = f"W/{hash(ret)} Our response body will not return if if_none_match == etag: Response. status_code = 304 return response # Response.headers ["ETag"] = ETag return coder. Decode (ret) If there is no cache ret = await func(*args, **kwargs) await backend.set(cache_key, coder.encode(ret), expire or FastAPICache.get_expire()) return ret return inner return wrapperCopy the code
I have marked some comments on the code, combing through the processing process of this decoration together, in fact, can be understood as similar to the middle way, so, before we need to have a log, but also through this way of decorator to log processing!
Holdings backends package
This package is defined by several supported cache extensions, usually reids and memcache, as well as our own memory,
Usually these have a base class,
import abc from typing import Tuple class Backend: @abc.abstractmethod async def get_with_ttl(self, key: str) -> Tuple[int, str]: raise NotImplementedError @abc.abstractmethod async def get(self, key: str) -> str: raise NotImplementedError @abc.abstractmethod async def set(self, key: str, value: str, expire: int = None): The # Expire command is used to set the expiration time of the key, after which the key will no longer be available. Units are in seconds. raise NotImplementedError @abc.abstractmethod async def clear(self, namespace: str = None, key: str = None) -> int: raise NotImplementedErrorCopy the code
Then our corresponding need to expand the need to achieve the above corresponding method.
Such as redis extension implementation:
from typing import Tuple
from aioredis import Redis
from fastapi_cache.backends import Backend
class RedisBackend(Backend):
def __init__(self, redis: Redis):
self.redis = redis
async def get_with_ttl(self, key: str) -> Tuple[int, str]:
p = self.redis.pipeline()
p.ttl(key)
p.get(key)
return await p.execute()
async def get(self, key) -> str:
return await self.redis.get(key)
async def set(self, key: str, value: str, expire: int = None):
return await self.redis.set(key, value, expire=expire)
async def clear(self, namespace: str = None, key: str = None) -> int:
if namespace:
lua = f"for i, name in ipairs(redis.call('KEYS', '{namespace}:*')) do redis.call('DEL', name); end"
return await self.redis.eval(lua)
elif key:
return await self.redis.delete(key)
Copy the code
Feel this design is still understandable! It’s not that complicated!
Other open source libraries, in fact, the general idea is the same as the above. Here is not too much expansion for the time being!! Ha ha!!!!!
4.2.4 A method for setting an expiration time
Some extended methods to set expiration time: TTL = calculate_TTL (expire)
def calculate_ttl(expire: Union[int, timedelta]) -> int:
if isinstance(expire, timedelta):
expire = int(expire.total_seconds())
return min(expire, ONE_YEAR_IN_SECONDS)
Copy the code
4.2.5 Use functools.partial to extend decorator functions
In the decorator cache above, if we need to extend the names of several new functions with default expiration times, we can do as follows:
Set expire defaults based on partial. Cache_one_minute = partial(cache, expire=60) cache_one_hour = partial(cache, expire=60) expire=60*60) cache_one_day = partial(cache, expire=60*60*24) cache_one_week = partial(cache, expire=60*60*24*7) cache_one_month = partial(cache, expire=60*60*24*30) cache_one_year = partial(cache, The expire = 60 * 60 * 24 * # 365), Update partial update_wrapper(cache_one_minute, cache) update_wrapper(cache_one_hour, cache) update_wrapper(cache_one_day, cache) update_wrapper(cache_one_week, cache) update_wrapper(cache_one_month, cache) update_wrapper(cache_one_year, cache)Copy the code
Use the same birdlike directness as the cache above:
@app.get("/test_cache_one_day")
@cache_one_day()
def partial_cache_one_day(response: Response):
return 'ok'
Copy the code
At the end
Simple notes! For reference only!
END
Jane: www.jianshu.com/u/d6960089b…
The Denver nuggets: juejin. Cn/user / 296393…
Public account: wechat search [children to a pot of wolfberry wine tea]
Let students | article | welcome learning exchange together 】 【 original 】 【 QQ: 308711822