Gobelieve making address

Disclaimer: Turn the book JackieF’s article, for their own convenience copy, add some of their own things. Link: https://www.jianshu.com/p/8121d6e85282


IM Core is divided into three parts:

Im client connection server (distributed deployment, no load balancing module)

Imr route query server (mainly to solve the problem of distributed IM deployment)

Ims Storage server (primary/secondary deployment)



Basic module

1. Packet protocol

Package: header (12) | body header: len (4), seq (4), CMD (1), the version (1), air (2)Copy the code

2. Data sending and receiving process

acceptReceiving a connection opens the writer thread and the reader thread writer thread: listeningclient.wtBlock the queue and write as soon as data is availableconnRead thread: According to the packet protocol fromconnRead the packet fromclient.HandleMessageTo deal withCopy the code

3. Several methods

PushMessagethroughroute_channelsendMSG_PUBLISHIMR
PushGroupMessagethroughroute_channelsendMSG_PUBLISH_GROUPIMR
SaveMessagethroughIMS RPCThe service callSavePeerMessage
SaveGroupMessagethroughIMS RPCThe service callSaveGroupMessage
im_client.SendMessage
    1.PushMessage2. Query the local route andEnqueueMessage
im_client.SendGroupMessage
    1.PushGroupMessage
    2.group_managerThe querygroup3. BygroupGet all themenberFor eachmenberQuery the routing table andEnqueueMessage
im_client.EnqueueMessageWrite dataclient.wtFor sending outCopy the code

IM module

The IM module is initialized. Procedure

1.redis_pool

2. Ims storage_pools connection: 3333

Called when the HTTP server reads the latest messageCopy the code

3. Rpc_clients ims: 13333

SyncMessage
SyncGroupMessage
SavePeerMessage
SaveGroupMessage
Copy the code

4. Group_rpc_clients (optional)

5. Imr route_channels connection: 4444

Enable read-write thread Write: Fromchannel.wtThe pipe values are sent concurrentlyimrRead:imrReceive the message and distribute it to the currentimNode Connection UserCopy the code

6.group_manager

1.loadFrom:mysqlloadinggroupTo keep,group_manager.groups
2.run: reidsTo subscribe tocase group_create,group_disband,group_member_add,group_member_remove,group_upgrade, callback processing add, delete, change and checkgroup_manager.groups
case pingDirty data Detection 3.ping: Send every five minutesping
Copy the code

Group_message_deliver: Ordinary group message distribution

1.init: Creates local storage file 2.run: listeningwtPipeline, data indicates that a new message has been written to the file read the file and sentCopy the code

8. ListenRedis silence

Redis subscribesspeak_forbidden Accepts event push, queries the local route to the corresponding client, and modifies the forbidden field.Copy the code

9.SyncKeyService

Values from the group_sync_c and sync_c channels, Save to REDIS (MSG_GROUP_SYNC_KEY and MSG_SYNC_KEY messages sent by the client will write the synchronization key within the message to the corresponding pipes group_sync_c and sync_c)Copy the code

10.StartHttpServer :6666

The web serverCopy the code

11. Connect to the client

StartSocketIO :websocket (webpage)

ListenClient: TCP 23000 (iOS, Android)

A client is generated when a long connection request is initiated and the verification succeeds.

Client connection usage: client.read() client.write() Read Cache Write Cache about 8 KB each

ListenClient process flow

1. Login authentication CMD :MSG_AUTH_TOKEN

The client willuidwithtokenTo the server byredis_poolQuery authentication Authentication succeeded: 1EnqueueMessageSend message {cmd:MSG_AUTH_STATUS,status:0}
        2.client.AddClient() Caches this connection to local routing table 3.client.IMClient.Login() Cache this link toIMRRouting table authentication failed: the routing table authentication failedEnqueueMessageSend message {cmd:MSG_AUTH_STATUS,status:1}
Copy the code

2.IMClient processes message types

MSG_IM: processes IM synchronization messages MSG_GROUP_IM: processes Group synchronization messages MSG_INPUTING: processes Inputing messages MSG_RT: processes real-time messages MSG_UNREAD_COUNT: MSG_SYNC_KEY: the client sends the SYNC_KEY to the server. MSG_SYNC_GROUP: the client requests the latest group message. MSG_GROUP_SYNC_KEY: The client uploads GROUP_SYNC_KEY to the serverCopy the code

RoomClient message type

MSG_ENTER_ROOM: enter the chat room MSG_LEAVE_ROOM: leave the chat room MSG_ROOM_IM: IM messages in the chat roomCopy the code

VOIPClient message type

MSG_VOIP_CONTROL: VOIP commandCopy the code

CustomerClient Message type

MSG_CUSTOMER: customer -> customer service MSG_CUSTOMER_SUPPORT: customer service -> customerCopy the code

3.MSG_IM processing flow: User A -> user B

1.SaveMessage: Saves the message to the target userBStorage queue (rpc->SavePeerMessage2).SaveMessage: Saves the message to the senderAStorage queue (for multipoint login synchronization messages) 3.PushMessage: External push messages to target usersB(byIMRRouting)MSG_IM
4.SendMessage: Sends a synchronization message to the target userB(External push + local address send)MSG_SYNC_NOTIFY
5.SendMessage: Sends a synchronization message to the senderA(Multi-point login)MSG_SYNC_NOTIFY
6.EnqueueMessage: Reply to this linkMSG_ACKThe messageCopy the code

4.MSG_GROUP_IM processing flow

By 1.group_managerQuery to specifiedgroup2. According toGroupType: 1..HandleSuperGroupMessage:SaveGroupMessage: saveMSG_GROUP_IMMessage (rpc->SaveGroupMessage)
        PushGroupMessage: External push group messageMSG_GROUP_IM
        SendGroupMessage: Send group synchronization notification messages (external push + local address push)MSG_SYNC_GROUP_NOTIFY
    2.HandleGroupMessage:group_message_deliver:saveMessage: Saves the message locallyMSG_PENDING_GROUP_MESSAGE
            ReadMessage: Read the messageMSG_PENDING_GROUP_MESSAGEFor each member of the group:SaveMessage: saveMSG_GROUP_IMThe messagePushMessage: External push messageMSG_GROUP_IM
                        SendMessage: Sends synchronization messagesMSG_SYNC_NOTIFY
3.EnqueueMessage: Reply to this linkMSG_ACKThe messageCopy the code

5. MSG_INPUTING:

SendMessage: Sent to the target userCopy the code

6.MSG_RT real-time message processing process

SendMessage: Sends a message to the target userCopy the code

7.MSG_UNREAD_COUNT sets user unread:

Hashkey :users_ is operated by redis_pool$appid_$uid field:unread
Copy the code

8. Message synchronization process:

Server -> client MSG_SYNC_NOTIFY: Client:1.isSyncing==false: sendSync Send old syncKey, request message MSG_SYNC, and change the status to synchronous2.isSyncing==trueIn the synchronous state, newSyncKey is saved in pendingSyncKey,this.pendingSyncKey = newSyncKey; Client -> server: MSG_SYNC Server:1.Sync_key passed from the client yields last_id, if last_id==0, retrieve the latest sync_key from Redis.2.RPC ->SyncMessage: Retrieves the most recent message MSGS from the cache based on last_id3.EnqueueMessage: send MSG_SYNC_BEGIN message (client does not process)4.EnqueueMessage: Sends MSGS in a loop5.EnqueueMessage: send the MSG_SYNC_END message containing the last MSG with the sync_key as the MsgID server -> client: MSG_SYNC_END Client:1.Remove newSyncKey (if newSyncKey>this.synckey, the client saves newSyncKey and sends it to the server.2.Switching the synchronization status isSyncing =false; 
    3.ifthis.pendingSyncKey > this.synCKEY: if a new MSG_SYNC_NOTIFY message was passed to the client during the last synchronization, sync again, sendSync: Send syncKey, pendingSyncKey: zero client -> server: MSG_SYNC_KEY1.Get last_id from sync_key,2.Wrap it in SyncHistory and write to the pipe sync_c < -sCopy the code





(IM architecture)


9. Super group synchronization process

MSG_SYNC_GROUP_NOTIFY Client:1.isSyncing==false: sendSync Send old syncKey, request message MSG_SYNC_GROUP, and the status changes to synchronous2.isSyncing==trueIn the synchronous state, newSyncKey is saved in pendingSyncKey,this.pendingSyncKey = newSyncKey; MSG_SYNC_GROUP Server:1Group_id, sync_key, last_id=sync_key, group_sync_key_$groupid, redis = new group_sync_key_$groupid2.RPC ->SyncGroupMessage: fetch the latest group message MSGS according to last_id3.EnqueueMessage: sends the MSG_SYNC_GROUP_BEGIN message4.EnqueueMessage: Sends MSGS in a loop5.EnqueueMessage: send MSG_SYNC_GROUP_END where sync_key is the MsgID server of the last MSG -> client MSG_SYNC_GROUP_END client:1.Remove GroupSyncKey. SyncKey and current syncKey (if GroupSyncKey. SyncKey is larger, the client will save the update and send it to the server MSG_GROUP_SYNC_KEY)2.Switching the synchronization status isSyncing =false; ifthis.pendingSyncKey > this.synCKEY: if a new MSG_SYNC_NOTIFY message has been passed to the client during the last synchronization, sync again, sendSync: send syncKey, pendingSyncKey: zero clients -> server MSG_GROUP_SYNC_KEY Server:1.Remove the group_id last_id2.Wrap it in SyncGroupHistory and write to the pipe group_sync_c < -sCopy the code

Differences between super groups and ordinary groups:

Messages from ordinary groups will be distributed by IMR to all users’ own message queues. Supergroups put messages in a single queue and let all users pull from that queue.



—————————————————————————————


IMR module

1.redis_pool

2.group_manager

3.ListenClient :4444

1.MSG_SUBSCRIBE
2.MSG_UNSUBSCRIBE
3.MSG_SUBSCRIBE_ROOM
4.MSG_UNSUBSCRIBE_ROOM(All four of the above are trueimrAdd, delete, modify and check the maintained routing table.MSG_PUBLISHObtain the message type and target user based on the message content. 2. Query the routing table, and put the message into the third-party push queue if the user is offline. Get the user connection based on the routing tableimNode, and pushes the message to the third filtering condition: 1. The message type is notMSG_IM,MSG_GROUP_IM,MSG_CUSTOMER,MSG_CUSTOMER_SUPPORT,MSG_SYSTEM2. The targetIMNode and sendIMThe nodes are different from node 6.MSG_PUBLISH_GROUPObtain the message type and group based on the message content. 2. Query the routing table for each member in the group. Group to allIMThe third filtering criteria are as follows: 1. The message type isMSG_PUBLISH_GROUP

7.MSG_PUBLISH_ROOM1. Obtain information based on the message contentroomid2. According toroomidQuery the routing table to obtain all nodes. 3. Send messages to each node. The sending node is not the same as the destination nodeCopy the code


IMR (architecture)

——————————————————————————–

IMS module (master/slave)

1.NewMaster

1.init: Creates container storageclients, create a queueewt
2.run: Listen on queueewtThat will beewtQueue messages are added to the cachecacheAn array ofcacheRun this command every 1000 minutes or every 1 minuteSendBatch.SendBatch: Encapsulates the messageMSG_STORAGE_SYNC_MESSAGE_BATCH, writes to each message queue connected from the nodeclient.ewt
Copy the code

2.NewSlaver listens on the primary node (optional)

run: Connects to the primary node and sends a message if the connection is successfulMSG_STORAGE_SYNC_BEGINLoop read message:MSG_STORAGE_SYNC_MESSAGE storage.SaveSyncMessage(emsg)
        MSG_STORAGE_SYNC_MESSAGE_BATCH storage.SaveSyncMessageBatch(mb)
Copy the code

3. WaitSignal processes interrupt SIGINT SIGTERM

storage.FlushPeerIndex() will everyone the latest messagemsgidWritten to the filestorage.FlushGroupIndex(a)Copy the code

4.ListenSyncClient master listens on 3334

Processing from the node connection RunLoop: 1. Initialization (synchronous) accept MSG_STORAGE_SYNC_BEGIN messages, to get the msgid, according to the msgid LoadSyncMessagesInBackground query to get the message, and sent to the node 2. Add a secondary node connection to clients 3. EnterforLoop that listens to the message queue client.ewt and sends it to the slave nodebreakAfter RemoveClientCopy the code

5. ListenRPCClient: 13333

SyncMessage SyncGroupMessage SavePeerMessage SaveGroupMessage GetNewCount Is called by IMCopy the code

6.ListenClient() 3333

For each connection: 1.init: Creates a write pipewt
    2.run: Write thread, fromwtThe pipe fetches the data and sends the reader thread,HandleMessage
Copy the code

7.HandleMessage

1.MSG_LOAD_OFFLINEIM: storage_client.LoadOfflineMessageResponse 2.MSG_SAVE_AND_ENQUEUEIM:storage_client.SaveAndEnqueueMessageResponse 3.MSG_DEQUEUEIM: storage_client.DequeueMessageResponse * 4.MSG_LOAD_LATESTIM-StartHttpServer-LoadLatestMessageResponse * 5.MSG_LOAD_HISTORYIM-StartHttpServer-LoadHistoryMessageIn response to 6.MSG_SAVE_AND_ENQUEUE_GROUPIM: storage_client.SaveAndEnqueueGroupMessageIn response to 7.MSG_DEQUEUE_GROUPIM :storage_client.DequeueGroupMessageIn response to eight.MSG_LOAD_GROUP_OFFLINEIM: storage_client.LoadGroupOfflineMessageResponse (above except 4,5, not yetIMModule call)Copy the code

8. Message storage mechanism


All MSQ messages will be stored in the message_0 file. The maximum size of the file is 128*1024*1024(128M). When the storage is full, the next block(Message_1) will be stored in increasing numbers.

To store a message, two pieces of information are stored, MSG, the message body, and MSG_OFFLINE, the message message.


Store information = MAGIC + MSG + MAGIC

const MAGIC = 0x494d494dCopy the code


MSG->msg

Package: header (12) | body (length) header: Length + seq + CMD + version (+3BytePadding) Body length + message offset + message type + version (+3 placeholders) body[:24] : Sender + Reader + timestamp +... + content(json)Copy the code


MSG_OFFLINE->msg

Package: header (12) | body (length) header: Length + seq + CMD + version (+3BytePadding) Body length + message offset + message type + version (+3 placeholders) appid+receiver+msgid,device_id,prev_msgidCopy the code


Client gets historical messages:


Client client1 – sync_key 1345 -> IM -> IMS Searches for client1’s latest sync_key and compares whether sync_key is the same as the one carried by the client. If not, it is 3985. Then the message is searched in chain from sync_KEY3985, all messages remember the last message, until sync_key1345 is found. All information is returned to the client.




(IMS architecture)



Project deployment

  • Install the GO build environment

    Reference link :golang.org/doc/install

  • Download the IM_service code

    cd $GOPATH/src/github.com/GoBelieveIO

    Git clone github.com/GoBelieveIO…

  • Compiling proto files (optional)

    cd im_service

    // Pay attention to the need to climb the wall

    go get google.golang.org/grpc

    go get -u github.com/golang/protobuf/{proto,protoc-gen-go}

    export PATH=$PATH:$GOPATH/bin

    protoc -Irpc/ rpc/rpc.proto –go_out=plugins=grpc:rpc

    python -m grpc.tools.protoc -Irpc –python_out=rpc/ –grpc_python_out=rpc/ rpc/rpc.proto

  • compile

    cd im_service

    mkdir bin

    go get github.com/bitly/go-simplejson

    go get github.com/golang/glog

    go get github.com/go-sql-driver/mysql

    go get github.com/garyburd/redigo/redis

    go get github.com/googollee/go-engine.io

    go get github.com/richmonkey/cfg

    go get github.com/valyala/gorpc

    // Note that you need to climb the wall (optional)

    go get google.golang.org/grpc

    make install

    The executable program is in the bin directory

  • Install mysql database, redis, and import db.sql

  • For description of configuration items, see ims.cfg.sample, IMr.cfg. sample, and im.cfg.sample

  • In the /bin directory, the files are im, imr, ims, imr. CFG, im. CFG, im.cfg, run.sh, stop.sh
  • Create a path folder in the configuration item file:
  • For example, / TMP /pending, / TMP /im

  • Script is run. Sh:
    #! /bin/sh
    pushd `dirname $0` > /dev/null
    BASEDIR=`pwd`
    popd > /dev/null
    
    #-cpuprofile=/tmp/ims_profile
    $BASEDIR/ims  -logtostderr=true ims.cfg >ims.log 2>&1 &
    $BASEDIR/imr -logtostderr=true imr.cfg >imr.log 2>&1 &
    $BASEDIR/im -logtostderr=true im.cfg >im.log 2>&1 &
    Copy the code
  • The script stop. Sh
  • #! /bin/sh
    killall im
    killall imr
    killall ims
    Copy the code


  • . / run. Sh
  • Process have opened ps – ef | grep im_service
  • Check whether netstat -lntp is enabled for the port of the configuration file
  • Log Indicates whether the log is normal
  • After ensuring that all of the above is successfully deployed, invoke the test script:
  • https://github.com/GoBelieveIO/gobelieve_vagrant/tree/master/cli

    python gobelieve_client. py

    Test success print:

    cmd: 27 msg: 0
    cmd: 28 msg: 0
    cmd: 29 msg: 32
    cmd: 27 msg: 32
    cmd: 4 msg: (13800000001, 13800000000, 1526004909, 'test im 6720261727')
    cmd: 28 msg: 32
    recv message success
    send success
    test peer message completedCopy the code

    Verify that the program sends and receives normally.

  • Deployment is missing something can refer to, some directories or files are not necessarily used, please take use:
  • #! /bin/bash
    apt-get -y update
    
    #install mysql
    apt-get install debconf-utils -y
    debconf-set-selections <<< "mysql-server mysql-server/root_password password GoBelieve123456"
    debconf-set-selections <<< "mysql-server mysql-server/root_password_again password GoBelieve123456"
    apt-get install mysql-server -y
    
    
    
    
    #install redis
    
    if[!-f  "/usr/local/bin/redis-server" ]; then
        apt-get -y install make
        mkdir /opt/redis
        cd /opt/redis
        # Use latest stable
        wget -q http://download.redis.io/redis-stable.tar.gz
        # Only update newer files
        tar -xz --keep-newer-files -f redis-stable.tar.gz
         
        cd redis-stable
        make
        make install
        rm -f /etc/redis.conf
        cp -u /vagrant/redis.conf /etc/redis.conf
    fi
    
    #install python & python depApt-get install -y build-essential python2.7 python-dev python /vagrant/get-pip.pycd /vagrant/ && pip install -r requirements.txt
    
    
    #create group databasemysql -u root -pGoBelieve123456 < /vagrant/db.sql mkdir -p /data/redis mkdir -p /data/im mkdir -p /data/im_pending mkdir  -p /data/wwwroot mkdir -p /data/logs/im mkdir -p /data/logs/ims mkdir -p /data/logs/imrif[!-d "/data/wwwroot/im_bin" ]; then
        cp -r /vagrant/im_bin /data/wwwroot/im_bin
    fi
    
    if[!-d "/data/wwwroot/cli" ]; then
        cp -r /vagrant/cli /data/wwwroot/cli
    fi
    Copy the code