This is the third day of my participation in the August Text Challenge.More challenges in August

background

In the previous article: How to properly manipulate Kafka using sarama packages in Golang? , I introduced the use of Kafka in Golang, as well as common problems such as message loss, out-of-order, repeated consumption and other problems and solutions in IM.

In this article, we’ll take a closer look at how Kafka can be used between modules.

The idea here is introduced in terms of Traditional Architecture (TeamTalk) -> Guazi IM (production grade, no code) -> OpenIM (Open Source rookie).

What is “traditional” : single process or more approach, users within 100,000 levels. What is “modern” : millions of users using microservices distributed architecture.

Traditional architecture

The traditional architecture (100,000 users) is still based on the idea of multi-process. Here we take TeamTalk as an example. TeamTalk is the open source internal enterprise communication software of Mogujie five years ago (2015), which was also popular at that time. Its architecture is shown below:

A brief introduction to how it works:

  • Login: The client sends the message to login (rebanlancer, load balancing) through HTTP and obtains a MSG IP address with a low load (the number of login users, that is, the number of TCP connections)
  • MSG: Connects to MSG through TCP for login authentication, session query, and message sending and receiving. MSG plays a gateway role in the whole architecture, mainly responsible for managing TCP connections and communicating with clients. Business implementation and message storage are all in DBProxy. Because MSG is stateless, it can be copied horizontally. Technical implementation mainly uses epoll I/O multiplexing, is a single process. In this way, one can support 30,000 to 50,000 connections without any problem, and two or three can achieve 100,000 users online.
  • Route: MSGS communicate with each other through a routing service to solve the problem of user messaging on different MSGS.
  • Dbproxy: business implementation and message store, also stateless design, so you can deploy two, one for login service, one for responsible message service.
  • There are other modules that I won’t cover here, such as HTTPMSG that provide HTTP interfaces for external calls, push services, file services, etc.

This architecture, which is perfectly fine at the 100,000 user level, is suitable for the scenario of enterprise internal communication, but at the million user level, there are some problems:

  1. Single point of failure. Route. All MSGS need to be connected to a route for route distribution. Then the service dies, it’s over. Of course, it can also be solved by Haproxy and KeepAlive at the operation and maintenance level.
  2. MSG is directly connected to DBproxy. If the requests are very intensive in a certain period of time, dbProxy pressure is bound to increase sharply. Although DBProxy uses thread pools and local task queues for queued processing, requests are backlogged due to FCFS (first come, first processed), resulting in subsequent request failures and timeouts.
  3. Dbproxy is a large module that can be expensive to change. If a new feature is not tested well, it will crash after launch or be buggy and the entire service will be unusable. Another point is that the frequency of various services is also different. For example, login is the most frequent because users keep opening the APP, so it should be separated. Use different mysql to reduce stress.

Therefore, the architecture must be upgraded to support the larger volume. The route problem can be solved by adding Kafka to the MSG and DBProxy layer.

This is one of the reasons WHY I think modern IM architectures use MQ, and the benefits of introducing Kafka:

  • Kafka QPS is in the millions, mysql QPS is not very easy to calculate, but almost a few thousand to tens of thousands, is certainly much slower than Kafka, so before the message stored in mysql, let Kafka’s big body block, to avoid killing mysql.
  • Decoupling: The availability of services is unambiguably important. With the introduction of Kafka, we have moved from strong dependencies between systems to weak data-based dependencies, where modifying one module can affect other modules. Now, because different Kafka consumers can consume messages at the same time, I can break up large modules into smaller ones to handle messages that interest me.

The above two points are the most important points for me to move from 100,000 to millions.

Modern architecture

Recently, I mainly studied Guazi IM (Guazi used car business) and openIM. The author of Guazi IM, Feng Yu Dashen, shared a series of articles in which detailed sequence diagram, protocol design, sub-database and sub-table, TimeLine model synchronization, etc. (there is an address at the end of the article). After reading the articles, you will have a general understanding. Open-im-server is an Open source project started by former wechat technical experts. It is developed using go language and uses inbox model, which can be well combined with guazi IM design documents to deepen understanding.

Melon seeds IM

(Photo source: Public number – common programmer author – Feng Yu)

Kafka layer Kafka layer Kafka layer Kafka layer Kafka

  • A massive online users of instant messaging (IM) system complete design Plus mp.weixin.qq.com/s/TYUNPgf_3…

After watching this, you’re probably wondering as MUCH as I am: how does Kafka use 🤔?

Opem-IM-Server

(Photo credit: github.com/OpenIMSDK/O…)

The author seems to be from the wechat team, so the design of the architecture is naturally based on the inbox + write diffusion mode, which is still being improved and has realized the basic functions of single chat and group.

Here’s an example. Like me, you might at first think texting is like this:

  1. The gate gateway produces two messages to Kafka (steps 2 and 3 above) and replies to the sender successfully (Step 4).
  2. Through different consume names, the message is consumed simultaneously by multiple services. In config.yaml you can see the configuration for Kafka: 2 topics, 3 consumer groups
Ws2mschat: addr: [127.0.0.1:9092] Topic: "ws2MS_chat" MS2pschat: addr: [127.0.0.1:9092] Topic: "ws2ms_chat" MS2pschat: addr: [127.0.0.1:9092] Topic: "ms2ps_chat" consumergroupid: msgToMongo: mongo msgToMySql: mysql msgToPush: pushCopy the code
  1. The Transfer service persists messages to mysql and mongodb through two different consumer groups
  2. Since pusher and Transfer are different consumer groups, pusher concurrently calls the message through RPC to gate and forwards it to the other party (write diffused, write one copy each from outbox and to inbox). If the peer party is offline, push it through the third party. If the peer party is online, push it through the WebSocket.

After watching it, I can’t help but have two questions:

  • Where is the message sequence number generated? Why didn’t I see it? How else does the client sort messages?
  • Mysql > return mysql > return mysql > return mysql > return mysql > return mysql > return mysql > return mysql If Kafka loses a message, it may appearData inconsistency?

The second question I asked in Golang is how to properly use the Sarama package to operate Kafka. As long as producer and Comsumer use the data correctly, the final data will be consistent. Why don’t you want strong consistency here? My understanding is that for the online user, the push has already been done and the client will align the data itself (local storage). The offline client is pulled online and then aligned, which has sufficient time for our service to deal with, of course, there are bugs (consumers hang up) another word.

So, let’s analyze the first question.

Take a closer look at the Kafka process in Open-IM-Server

I’ve drawn a simplified version of this:

After combing and looking at the source code, I corrected several mistakes:

  1. Instead of msG-gateway delivering messages to Kafka, gRPC calls the Chat service, which generates two messages to Kafka (outbox and inbox).
  2. Chat locally generates only the MsgID and send time, and then sends the MsgID to the client. The client sorts the MsgID and send time with the MsgID generated by the server.
  3. The generation of the message sequence number is in the consumer of the transfer. At first I thought there was no SEQ, but later I realized that I had assigned a SEQ before writing mongodb. Is that a problem? Seq should be generated in chat. Otherwise, the client will get seQ when pulling and sendTime when sending.
  4. In Kafka, there are three consumer groups for mysql persistence, offline message storage, mongodb, and push tasks.
  5. Transfer’s Mongo consumer, after writing to mongodb (if the isHistory field is set on the client), will first try to call gRPC directly to push the message to the client, otherwise it will go around again through Kafka.

Key codes of chat service send_msg.go are as follows:

func GetMsgID(sendID string) string {
   t := time.Now().Format("The 2006-01-02 15:04:05")
   return t + "-" + sendID + "-" + strconv.Itoa(rand.Int())
}

func (rpc *rpcChat) UserSendMsg(_ context.Context, pb *pbChat.UserSendMsgReq) (*pbChat.UserSendMsgResp, error) {
    log.InfoByKv("sendMsg", pb.OperationID, "args", pb.String())

    // Get a message ID, as seen above, via a timestamp + a random string of numbers, not yet fixed length, implementation is a bit weak.serverMsgID := GetMsgID(pb.SendID) pbData := pbChat.WSToMsgSvrChatMsg{} pbData.MsgFrom = pb.MsgFrom pbData.SessionType =  pb.SessionType// Send time is based on the time the service received, because the client time is not reliable
    // OpenIMServer does not use a separate field for sorting, but uses the send time, which I feel is a bit flawed
    // When deploying multiple machines, the time may be inconsistent, and NTPD calibration cannot guarantee 100% consistency
    pbData.SendTime = utils.GetCurrentTimestampByNano()
    / /..
    switch pbData.SessionType {
        case constant.SingleChatType:
            // Send to kafka, diffuse write mode
            // Your own outbox, their inbox
            err1 := rpc.sendMsgToKafka(&pbData, pbData.RecvID)
            err2 := rpc.sendMsgToKafka(&pbData, pbData.SendID)
            iferr1 ! =nil|| err2 ! =nil {
                return returnMsg(&replay, pb, 201."kafka send msg err"."".0)}return returnMsg(&replay, pb, 0."", serverMsgID, pbData.SendTime)
        case constant.GroupChatType:
            // ...
            return returnMsg(&replay, pb, 0."", serverMsgID, pbData.SendTime)
        default:}// ...
}
Copy the code

Transfer mongo consumer key codes:

func (mc *HistoryConsumerHandler) handleChatWs2Mongo(msg []byte, msgKey string) {
   log.InfoByKv("chat come mongo!!!".""."chat".string(msg))
   pbData := pbMsg.WSToMsgSvrChatMsg{}
   err := proto.Unmarshal(msg, &pbData)

   iferr ! =nil {
      log.ErrorByKv("msg_transfer Unmarshal chat err".""."chat".string(msg), "err", err.Error())
      return
   }

   pbSaveData := pbMsg.MsgSvrToPushSvrChatMsg{}
   pbSaveData.SendID = pbData.SendID
   pbSaveData.SenderNickName = pbData.SenderNickName
   pbSaveData.SenderFaceURL = pbData.SenderFaceURL
   pbSaveData.ClientMsgID = pbData.ClientMsgID
   pbSaveData.SendTime = pbData.SendTime
   pbSaveData.Content = pbData.Content
   pbSaveData.MsgFrom = pbData.MsgFrom
   pbSaveData.ContentType = pbData.ContentType
   pbSaveData.SessionType = pbData.SessionType
   pbSaveData.MsgID = pbData.MsgID
   pbSaveData.OperationID = pbData.OperationID
   pbSaveData.RecvID = pbData.RecvID
   pbSaveData.PlatformID = pbData.PlatformID
   Options := utils.JsonStringToMap(pbData.Options)
   //Control whether to store offline messages (mongo)
   isHistory := utils.GetSwitchFromOptions(Options, "history")
   //Control whether to store history messages (mysql)

   isPersist := utils.GetSwitchFromOptions(Options, "persistent")

   if pbData.SessionType == constant.SingleChatType {
      log.Info("".""."msg_transfer chat type = SingleChatType", isHistory, isPersist)

      if isHistory { // The client is enabled to store messages
         if msgKey == pbSaveData.RecvID {
            err := saveUserChat(pbData.RecvID, &pbSaveData)
            iferr ! =nil {
               log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
            }
         } else if msgKey == pbSaveData.SendID {
            err := saveUserChat(pbData.SendID, &pbSaveData)
            iferr ! =nil {
               log.ErrorByKv("data insert to mongo err", pbSaveData.OperationID, "data", pbSaveData.String(), "err", err.Error())
            }
         }
      }

      if msgKey == pbSaveData.RecvID {
         pbSaveData.Options = pbData.Options
         pbSaveData.OfflineInfo = pbData.OfflineInfo
         sendMessageToPush(&pbSaveData)
      }
      log.InfoByKv("msg_transfer handle topic success...".""."")}else if pbData.SessionType == constant.GroupChatType {
      log.Info("".""."msg_transfer chat type = GroupChatType")

      if isHistory {
         uidAndGroupID := strings.Split(pbData.RecvID, "")
         saveUserChat(uidAndGroupID[0], &pbSaveData)
      }

      pbSaveData.Options = pbData.Options
      pbSaveData.OfflineInfo = pbData.OfflineInfo
      sendMessageToPush(&pbSaveData)
      log.InfoByKv("msg_transfer handle topic success...".""."")}else {
      log.Error("".""."msg_transfer recv chat err, chat.MsgFrom = %d", pbData.SessionType)
   }
}

func sendMessageToPush(message *pbMsg.MsgSvrToPushSvrChatMsg) {
   log.InfoByKv("msg_transfer send message to push", message.OperationID, "message". message.String()) msg := pbPush.PushMsgReq{} msg.OperationID = message.OperationID msg.PlatformID = message.PlatformID msg.Content = message.Content msg.ContentType = message.ContentType msg.SessionType = message.SessionType msg.RecvID = message.RecvID msg.SendID = message.SendID msg.SenderNickName = message.SenderNickName msg.SenderFaceURL = message.SenderFaceURL msg.ClientMsgID = message.ClientMsgID msg.MsgFrom = message.MsgFrom msg.Options = message.Options msg.RecvSeq = message.RecvSeq msg.SendTime = message.SendTime msg.MsgID = message.MsgID msg.OfflineInfo = message.OfflineInfo// Try gRPC first, otherwise send directly to Kafka, let push consume from Kafka and push
   grpcConn := getcdv3.GetConn(config.Config.Etcd.EtcdSchema, strings.Join(config.Config.Etcd.EtcdAddr, ","), config.Config.RpcRegisterName.OpenImPushName)

   if grpcConn == nil {
      log.ErrorByKv("rpc dial failed", msg.OperationID, "push data", msg.String())
      pid, offset, err := producer.SendMessage(message)
      iferr ! =nil {
         log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
      }
      return
   }

   msgClient := pbPush.NewPushMsgServiceClient(grpcConn)
   _, err := msgClient.PushMsg(context.Background(), &msg)
   iferr ! =nil {
      log.ErrorByKv("rpc send failed", msg.OperationID, "push data", msg.String(), "err", err.Error())
      pid, offset, err := producer.SendMessage(message)
      iferr ! =nil {
         log.ErrorByKv("kafka send failed", msg.OperationID, "send data", message.String(), "pid", pid, "offset", offset, "err", err.Error())
      }
   } else {
      log.InfoByKv("rpc send success", msg.OperationID, "push data", msg.String())
   }
}
Copy the code

Take a look at the process with a sequence diagram:

  1. The client sends a message to the MSG_gateway via webSocket
  2. Msg_gateway calls chat’s UserSendMsg() via gRPC to send messages
  3. The Chat service mainly generates unique message ids (deduplication) and delivery time locally
  4. Then the message is sent to Kafka. After all Kafka slaves receive the message, the message is successfully sent
  5. GRPC return
  6. Reply ACK to client with error code and MsgID generated by server etc
  7. Mysql consumes 2 messages (sender’s outbox, receiver’s inbox)
  8. Full storage in mysql is persisted to meet the requirements of background analysis and auditing. Clients are pulled from mongodb (deleted after being pulled), which is similar to wechat logic. Wechat says it does not store data on the server, so when you log in to the PC with wechat, you will find that the message on the phone can not be seen on the PC? Either the PULL process is not performed on the PC, or the offline message is only applied to the APP and cannot be pulled on the PC.
  9. Similarly, in transfer, the consumer group mongodb consumes two messages
  10. REDIS_USER_INCR_SEQ: + UserID = “REDIS_USER_INCR_SEQ: “+ UserID = “REDIS_USER_INCR_SEQ:” + UserID = “”;
  11. Insert the Chat Collection in mongodb
  12. Priority is to push through gRPC call pusher, otherwise go Kafka, push through pusher consumption
  13. Pusher also calls the MSG_gateway’s MsgToUser push message via gRPC
  14. Push via websocket
  15. When user B goes online, user B pulls an offline message from mongodb by using pull (the offline message will be deleted from mongodb after it succeeds).
  16. .

The tail language

At this point, it is relatively clear how to use Kafka in IM, and you can read the code for more details.

reference

Public account — Common programmer:

  • A complete design of instant messaging system (IM) for mass online users Plus
  • The complete design of a massive online instant messaging system (IM)
  • SESSION structure of the IM system
  • How to search local chat history on IM mobile terminal
  • Message synchronization mechanism based on the TimeLine model
  • Ensure that messages are not lost in order under the TimeLine model