This is the 20th day of my participation in the First Challenge 2022.
Hello ~ I’m Milo! I am building an open source interface testing platform from 0 to 1, and also writing a set of corresponding tutorials, I hope you can support more. Welcome to follow my public account Milo test diary, get the latest article tutorial!
review
In the previous section we wrote the interface for getting the “ready” message/read message, but we didn’t write how to generate the message. We are also at a standstill in the processing of broadcast messages.
I’ve been thinking a few days about using messages to decouple action logs from push messages, so far. Only emails or other notifications, as well as message push and operation records are suitable. After all, our link is not long, and it is only a single structure project for the time being.
Introducing too many components makes us more complex and harder to maintain. Consider splitting services and introducing more components if necessary.
The overall train of thought
In fact, before doing a function, or the best plan under the scheme. For me, this ability is still relatively weak, so a general design scheme will be of great help.
-
Websocket
Because we need the server to actively push data to the client (browser), so it is best that the server can actively notify, after all, the front end has been rotating interface, or there will be certain performance loss.
If we get the WebSocket connection according to the user ID when the user opens the basic page BasicLayout, and hold it on the server side. Messages are sent to the user when needed.
Here we define the types of messages: broadcast messages and personal messages. Broadcast messages are similar to system notifications, such as version updates, and personal messages. What I can think of at present is to notify the other party after the test plan is executed, or to follow the case function in the future, and notify the corresponding person when the case changes.
So we need an object (dictionary) that stores all the connections so that we can push messages.
-
Message storage problem
For message storage, we will put it in mysql first for the time being. Considering that the number of users is not too large, we can only query the records of 3 months to relieve the pressure of data.
-
Broadcast message problem
It is planned to open a new table to store the records of users reading broadcast messages, and check whether users have read broadcast messages by combining the table with the message table. If users choose to view all messages, it will be more complicated.
Added enumeration file app/enums/MessageEnum. Py
from enum import IntEnum
class WebSocketMessageEnum(IntEnum) :
# number of messages
COUNT = 0
# Desktop Notifications
DESKTOP = 1
class MessageStateEnum(IntEnum) :
"" message State enumeration class ""
unread = 1 # unread
read = 2 # read
class MessageTypeEnum(IntEnum) :
Message type enumeration class
all = 0 # All messages
broadcast = 1 # Broadcast message
others = 2 # Other messages
Copy the code
Three enumerated classes are defined here
-
The content type of the notification
There are two kinds of messages: quantity and desktop notification. Due to the lack of reference materials and the fact that I have never done a similar project, the definition may be strange, so we can only provide reference.
-
The message state
Read and unread.
-
Message type
All messages/broadcast messages/other messages (also known as personal messages)
Write the message return body
app/core/msg/wss_msg.py
from app.enums.MessageEnum import WebSocketMessageEnum
class WebSocketMessage(object) :
@staticmethod
def msg_count(count=1, total=False) :
return dict(type=WebSocketMessageEnum.COUNT, count=count, total=total)
@staticmethod
def desktop_msg(title, content=' ') :
return dict(type=WebSocketMessageEnum.DESKTOP, title=title, content=content)
Copy the code
There are two methods encapsulated inside:
-
Number of messages
Because we have new messages coming, we will tell the other side how many messages are coming, and the other side can click on the notification page and see all the data. Just like the answer to Zhihu’s invitation:
There’s going to be a red quantity superscript, but we don’t really care what the message is. Users need to click in or open to view.
-
Desktop notification
Desktop notifications require title and content, one for the title and one for the body. If the website receives the corresponding message, it will directly pop up the desktop dialog box, the difference is that this is temporary data, not written into the data table.
Generally speaking, the two blocks are returned by dict. In websocket, we will encapsulate the returned data in text, JSON and Bytesmessage formats.
Writing webSocket management classes (written by QYZHG, with minor changes)
app/core/ws_connection_manager.py
# import abc
from typing import TypeVar
from fastapi import WebSocket
from app.core.msg.wss_msg import WebSocketMessage
from app.crud.notification.NotificationDao import PityNotificationDao
from app.models.notification import PityNotification
from app.utils.logger import Log
MsgType = TypeVar('MsgType'.str.dict.bytes)
# class MsgSender(metaclass=abc.ABCMeta):
# @abc.abstractmethod
# def send_text(self):
# pass
#
# @abc.abstractmethod
# def send_json(self):
# pass
#
# @abc.abstractmethod
# def send_bytes(self):
# pass
class ConnectionManager:
BROADCAST = -1
logger = Log("wss_manager")
def __init__(self) :
self.active_connections: dict[int, WebSocket] = {}
self.log = Log("websocket")
async def connect(self, websocket: WebSocket, client_id: int) - >None:
await websocket.accept()
exist: WebSocket = self.active_connections.get(client_id)
if exist:
await exist.close()
self.active_connections[client_id]: WebSocket = websocket
else:
self.active_connections[client_id]: WebSocket = websocket
self.log.info(F"websocket:{client_id}: Connection established successfully!)
def disconnect(self, client_id: int) - >None:
del self.active_connections[client_id]
self.log.info(F"websocket:{client_id}: Safely disconnected!")
@staticmethod
async def pusher(sender: WebSocket, message: MsgType) - >None:
Call different methods to send messages according to different message types.
msg_mapping: dict = {
str: sender.send_text,
dict: sender.send_json,
bytes: sender.send_bytes
}
if func_push_msg := msg_mapping.get(type(message)):
await func_push_msg(message)
else:
raise TypeError(F"websocket cannot send{type(message)}Content!")
async def send_personal_message(self, user_id: int, message: MsgType) - >None:
""" Sending personal information """
conn = self.active_connections.get(user_id)
if conn:
await self.pusher(sender=conn, message=message)
async def broadcast(self, message: MsgType) - >None:
"" "broadcast "" "
for connection in self.active_connections.values():
await self.pusher(sender=connection, message=message)
async def notify(self, user_id, title=None, content=None, notice: PityNotification = None) :
Content: :param title: :param user_id: :param Notice: :return: ""
try:
Check whether it is desktop notification
if title is not None:
msg = WebSocketMessage.desktop_msg(title, content)
if user_id == ConnectionManager.BROADCAST:
await self.broadcast(msg)
else:
await self.send_personal_message(user_id, msg)
else:
The number of messages is not the number of desktop messages
if user_id == ConnectionManager.broadcast:
await self.broadcast(WebSocketMessage.msg_count())
else:
await self.send_personal_message(user_id, WebSocketMessage.msg_count())
Determine whether to fall into the push table
if notice is not None:
await PityNotificationDao.insert_record(notice)
except Exception as e:
ConnectionManager.logger.error(F "Failed to send message,{e}")
ws_manage = ConnectionManager()
Copy the code
The core method: Notify is used to actively send a message to a web page, which we’ll talk about later. Here we make a setting, if the title is not None, we consider it to be the desktop Notification type. There may be some special desktop notifications that need to be dropped in the library, so we check again whether there is Notification, if it is passed in, it means that it needs to be stored in the library.
When user_id is -1, it indicates a broadcast message.
Adjust the message table class
New msg_title field. Msg_type is set to 1, which is a broadcast message.
Added the broadcast read user table
app/models/broadcast_read_user.py
from datetime import datetime
from sqlalchemy import Column, INT, DATETIME, BIGINT
from app.models import Base
class PityBroadcastReadUser(Base) :
id = Column(BIGINT, primary_key=True)
notification_id = Column(INT, comment="Corresponding message ID", index=True)
read_user = Column(INT, comment="Read user ID")
read_time = Column(DATETIME, comment="Read time")
__tablename__ = "pity_broadcast_read_user"
def __init__(self, notification_id: int, read_user: int) :
self.notification_id = notification_id
self.read_user = read_user
self.read_time = datetime.now()
self.id = None
Copy the code
This table is relatively simple, just need to write message ID + read time (in fact, do not need to) + read user.
Write the corresponding DAO class
app/crud/notification/BroadcastReadDao.py
from app.crud import Mapper
from app.models.broadcast_read_user import PityBroadcastReadUser
from app.utils.decorator import dao
from app.utils.logger import Log
@dao(PityBroadcastReadUser, Log("BroadcastReadDao"))
class BroadcastReadDao(Mapper) :
pass
Copy the code
I’m going to stop there for the sake of space, and in the next video we’re going to continue with sugar cane.