This is the 20th day of my participation in the First Challenge 2022.

Hello ~ I’m Milo! I am building an open source interface testing platform from 0 to 1, and also writing a set of corresponding tutorials, I hope you can support more. Welcome to follow my public account Milo test diary, get the latest article tutorial!

review

In the previous section we wrote the interface for getting the “ready” message/read message, but we didn’t write how to generate the message. We are also at a standstill in the processing of broadcast messages.

I’ve been thinking a few days about using messages to decouple action logs from push messages, so far. Only emails or other notifications, as well as message push and operation records are suitable. After all, our link is not long, and it is only a single structure project for the time being.

Introducing too many components makes us more complex and harder to maintain. Consider splitting services and introducing more components if necessary.

The overall train of thought

In fact, before doing a function, or the best plan under the scheme. For me, this ability is still relatively weak, so a general design scheme will be of great help.

  • Websocket

    Because we need the server to actively push data to the client (browser), so it is best that the server can actively notify, after all, the front end has been rotating interface, or there will be certain performance loss.

    If we get the WebSocket connection according to the user ID when the user opens the basic page BasicLayout, and hold it on the server side. Messages are sent to the user when needed.

    Here we define the types of messages: broadcast messages and personal messages. Broadcast messages are similar to system notifications, such as version updates, and personal messages. What I can think of at present is to notify the other party after the test plan is executed, or to follow the case function in the future, and notify the corresponding person when the case changes.

    So we need an object (dictionary) that stores all the connections so that we can push messages.

  • Message storage problem

    For message storage, we will put it in mysql first for the time being. Considering that the number of users is not too large, we can only query the records of 3 months to relieve the pressure of data.

  • Broadcast message problem

    It is planned to open a new table to store the records of users reading broadcast messages, and check whether users have read broadcast messages by combining the table with the message table. If users choose to view all messages, it will be more complicated.

Added enumeration file app/enums/MessageEnum. Py

from enum import IntEnum


class WebSocketMessageEnum(IntEnum) :
    # number of messages
    COUNT = 0
    # Desktop Notifications
    DESKTOP = 1


class MessageStateEnum(IntEnum) :
    "" message State enumeration class ""
    unread = 1  # unread
    read = 2  # read


class MessageTypeEnum(IntEnum) :
    Message type enumeration class
    all = 0  # All messages
    broadcast = 1  # Broadcast message
    others = 2  # Other messages

Copy the code

Three enumerated classes are defined here

  • The content type of the notification

    There are two kinds of messages: quantity and desktop notification. Due to the lack of reference materials and the fact that I have never done a similar project, the definition may be strange, so we can only provide reference.

  • The message state

    Read and unread.

  • Message type

    All messages/broadcast messages/other messages (also known as personal messages)

Write the message return body

app/core/msg/wss_msg.py

from app.enums.MessageEnum import WebSocketMessageEnum


class WebSocketMessage(object) :

    @staticmethod
    def msg_count(count=1, total=False) :
        return dict(type=WebSocketMessageEnum.COUNT, count=count, total=total)

    @staticmethod
    def desktop_msg(title, content=' ') :
        return dict(type=WebSocketMessageEnum.DESKTOP, title=title, content=content)

Copy the code

There are two methods encapsulated inside:

  • Number of messages

    Because we have new messages coming, we will tell the other side how many messages are coming, and the other side can click on the notification page and see all the data. Just like the answer to Zhihu’s invitation:

There’s going to be a red quantity superscript, but we don’t really care what the message is. Users need to click in or open to view.

  • Desktop notification

    Desktop notifications require title and content, one for the title and one for the body. If the website receives the corresponding message, it will directly pop up the desktop dialog box, the difference is that this is temporary data, not written into the data table.

    Generally speaking, the two blocks are returned by dict. In websocket, we will encapsulate the returned data in text, JSON and Bytesmessage formats.

Writing webSocket management classes (written by QYZHG, with minor changes)

app/core/ws_connection_manager.py

# import abc
from typing import TypeVar

from fastapi import WebSocket

from app.core.msg.wss_msg import WebSocketMessage
from app.crud.notification.NotificationDao import PityNotificationDao
from app.models.notification import PityNotification
from app.utils.logger import Log

MsgType = TypeVar('MsgType'.str.dict.bytes)


# class MsgSender(metaclass=abc.ABCMeta):
# @abc.abstractmethod
# def send_text(self):
# pass
#
# @abc.abstractmethod
# def send_json(self):
# pass
#
# @abc.abstractmethod
# def send_bytes(self):
# pass


class ConnectionManager:
    BROADCAST = -1
    logger = Log("wss_manager")

    def __init__(self) :
        self.active_connections: dict[int, WebSocket] = {}
        self.log = Log("websocket")

    async def connect(self, websocket: WebSocket, client_id: int) - >None:
        await websocket.accept()
        exist: WebSocket = self.active_connections.get(client_id)
        if exist:
            await exist.close()
            self.active_connections[client_id]: WebSocket = websocket
        else:
            self.active_connections[client_id]: WebSocket = websocket
            self.log.info(F"websocket:{client_id}: Connection established successfully!)

    def disconnect(self, client_id: int) - >None:
        del self.active_connections[client_id]
        self.log.info(F"websocket:{client_id}: Safely disconnected!")

    @staticmethod
    async def pusher(sender: WebSocket, message: MsgType) - >None:
        Call different methods to send messages according to different message types.
        msg_mapping: dict = {
            str: sender.send_text,
            dict: sender.send_json,
            bytes: sender.send_bytes
        }
        if func_push_msg := msg_mapping.get(type(message)):
            await func_push_msg(message)
        else:
            raise TypeError(F"websocket cannot send{type(message)}Content!")

    async def send_personal_message(self, user_id: int, message: MsgType) - >None:
        """ Sending personal information """
        conn = self.active_connections.get(user_id)
        if conn:
            await self.pusher(sender=conn, message=message)

    async def broadcast(self, message: MsgType) - >None:
        "" "broadcast "" "
        for connection in self.active_connections.values():
            await self.pusher(sender=connection, message=message)

    async def notify(self, user_id, title=None, content=None, notice: PityNotification = None) :
        Content: :param title: :param user_id: :param Notice: :return: ""
        try:
            Check whether it is desktop notification
            if title is not None:
                msg = WebSocketMessage.desktop_msg(title, content)
                if user_id == ConnectionManager.BROADCAST:
                    await self.broadcast(msg)
                else:
                    await self.send_personal_message(user_id, msg)
            else:
                The number of messages is not the number of desktop messages
                if user_id == ConnectionManager.broadcast:
                    await self.broadcast(WebSocketMessage.msg_count())
                else:
                    await self.send_personal_message(user_id, WebSocketMessage.msg_count())
            Determine whether to fall into the push table
            if notice is not None:
                await PityNotificationDao.insert_record(notice)
        except Exception as e:
            ConnectionManager.logger.error(F "Failed to send message,{e}")


ws_manage = ConnectionManager()

Copy the code

The core method: Notify is used to actively send a message to a web page, which we’ll talk about later. Here we make a setting, if the title is not None, we consider it to be the desktop Notification type. There may be some special desktop notifications that need to be dropped in the library, so we check again whether there is Notification, if it is passed in, it means that it needs to be stored in the library.

When user_id is -1, it indicates a broadcast message.

Adjust the message table class

New msg_title field. Msg_type is set to 1, which is a broadcast message.

Added the broadcast read user table

app/models/broadcast_read_user.py

from datetime import datetime

from sqlalchemy import Column, INT, DATETIME, BIGINT

from app.models import Base


class PityBroadcastReadUser(Base) :
    id = Column(BIGINT, primary_key=True)
    notification_id = Column(INT, comment="Corresponding message ID", index=True)
    read_user = Column(INT, comment="Read user ID")
    read_time = Column(DATETIME, comment="Read time")

    __tablename__ = "pity_broadcast_read_user"

    def __init__(self, notification_id: int, read_user: int) :
        self.notification_id = notification_id
        self.read_user = read_user
        self.read_time = datetime.now()
        self.id = None

Copy the code

This table is relatively simple, just need to write message ID + read time (in fact, do not need to) + read user.

Write the corresponding DAO class

app/crud/notification/BroadcastReadDao.py

from app.crud import Mapper
from app.models.broadcast_read_user import PityBroadcastReadUser
from app.utils.decorator import dao
from app.utils.logger import Log


@dao(PityBroadcastReadUser, Log("BroadcastReadDao"))
class BroadcastReadDao(Mapper) :
    pass

Copy the code

I’m going to stop there for the sake of space, and in the next video we’re going to continue with sugar cane.