This article will introduce how to implement a distributed chat (IM) system based on Websocket.

Golang websocket communication, single machine can support millions of connections, using GIN framework, NGINx load, horizontal deployment, program internal communication, using GRPC communication protocol.

If you want to clone the project experience directly into the project experience goWebSocket project download, text from the introduction of what webSocket is, then start to introduce this project, and configure the domain name to do webSocket forwarding in Nginx. It then describes how to build a distributed system.

directory

  • 1. Project Description
    • 1.1 goWebSocket
    • 1.2 Project Experience
  • 2. Introduce webSocket
    • 2.1 What is webSocket
    • 2.2 webSocket Compatibility
    • 2.3 Why webSocket is Used
    • 2.4 webSocket establishment Process
  • 3. How to implement long connection system based on webSocket
    • 3.1 Use GO to Implement the webSocket Server
      • 3.1.1 Enabling Port Listening
      • 3.1.2 Upgrade Protocol
      • 3.1.3 Client Connection management
      • 3.1.4 Registering the asynchronous handler written by the socket on the client
      • 3.1.5 Registering an asynchronous handler for client socket reads
      • 3.1.6 Receiving and processing client data
      • 3.1.7 Process client request data in routing mode
      • 3.1.8 Prevent memory overflow and Goroutine non-reclamation
    • 3.2 Use javaScript to implement webSocket client
      • 3.2.1 Start and register the listener
      • 3.2.2 Sending Data
  • 4. GoWebSocket project
    • 4.1 Project Description
    • 4.2 Project Dependency
    • 4.3 Project Startup
  • 5, webSocket project Nginx configuration
    • 5.1 Why Do I Configure Nginx
    • 5.2 nginx configuration
    • 5.3 Troubleshooting
  • 6, pressure measurement
    • 6.1 Linux Kernel Optimization
    • 6.2 Preparation for pressure test
    • 6.3 Pressure measurement Data
  • 7, how to implement a distributed Im based on webSocket
    • 7.1 illustrates
    • 7.2 architecture
    • 7.3 Distributed System Deployment
  • 8. Review and reflect
    • 8.1 Application in other systems
    • 8.2 Need to be improved and optimized
    • 8.3 summarize
  • 9. References

1. Project Description

1.1 goWebSocket

This article will introduce how to implement a distributed system based on Websocket chat (IM).

Golang websocket communication, single machine support millions of connections, using GIN framework, NGINX load, horizontal deployment, program internal communication, using GRPC communication protocol.

  • Architecture diagram for webSocket use in general projects

1.2 Project Experience

  • Project address gowebSocket
  • IM- Chat home page or open im.91vh.com/home/index in a new window
  • The chat screen is displayed after the connection is opened
  • Multi-group chat can open two Windows at the same time

2. Introduce webSocket

2.1 What is webSocket

WebSocket protocol was born in 2008 and became an international standard in 2011. All browsers already support it.

Its biggest characteristic is that the server can take the initiative to push information to the client, the client can also take the initiative to send information to the server, is a real two-way equal dialogue, belongs to a server push technology.

  • Comparison between HTTP and WebSocket in communication process

  • Both HTTP and webSocket support certificate configuration, ws:// no certificate WSS :// Configure the protocol identifier of the certificate

2.2 webSocket Compatibility

  • Browser compatibility is starting to support webSocket versions

  • Server-side support

Golang, Java, PHP, Node. js, Python, nginx all have good support

  • Android and IOS support

Android can use Java-webSocket support for webSocket

IOS 4.2 and later have WebSockets support

2.3 Why webSocket is Used

    1. For business purposes, a proactive access client is required

At present, most requests are made using HTTP. All requests are initiated by the client, processed by the server, and the result is returned. The server cannot actively send data to a client

    1. We need to inform users actively in most scenarios, such as the chat system, active notification when users complete tasks, and notification to online users of some operational activities
    1. You can obtain the user online status
    1. Data is obtained through client active polling when there is no long connection
    1. This can be done in one way and used on many different platforms (H5/Android/IOS)

2.4 webSocket establishment Process

    1. The client initiates a request to upgrade the protocol

The client sends a request to upgrade the protocol in the standard HTTP packet format and adds header information to the packet

Connection: Upgrade Indicates that the Connection needs to be upgraded

Upgrade: Websocket needs to be upgraded to the WebSocket protocol

Sec-websocket-version: indicates that the Version of protocol 13 is 13

Sec-websocket-key: I6qjdEaqYljv3+9x+GrhqA== This is the base64 encode value, which is randomly generated by the browser and corresponds to the SEC-websocket-Accept response from the server

# Request Headers
Connection: Upgrade
Host: im.91vh.com
Origin: http://im.91vh.com
Pragma: no-cache
Sec-WebSocket-Extensions: permessage-deflate; client_max_window_bits
Sec-WebSocket-Key: I6qjdEaqYljv3+9x+GrhqA==
Sec-WebSocket-Version: 13
Upgrade: websocket
Copy the code

    1. The server responds to the upgrade protocol

The server receives the upgrade protocol request and responds as follows if the server supports the upgrade protocol

Returns:

Status Code: 101 Switching Protocols Indicates that Switching Protocols are supported

# Response HeadersConnection: upgrade Date: Fri, 09 Aug 2019 07:36:59 GMT Sec-WebSocket-Accept: mB5emvxi2jwTUhDdlRtADuBax9E= Server: Nginx / 1.12.1 Upgrade: websocketCopy the code
    1. After the upgrade protocol is complete, the client and server can send data to each other

3. How to implement long connection system based on webSocket

3.1 Use GO to Implement the webSocket Server

3.1.1 Enabling Port Listening

  • Websocket needs to listen on the port, so it needs to be ingolangThe success ofmainFunction starts the program as a coroutine
  • The main.go implementation starts
go websocket.StartWebSocket()
Copy the code
  • Init_acc. go Starts the program
// start the program funcStartWebSocket() {
	http.HandleFunc("/acc", wsPage)
	http.ListenAndServe(": 8089", nil)
}
Copy the code

3.1.2 Upgrade Protocol

  • The client sends HTTP requests to the server. We need to upgrade the HTTP protocol to webSocket protocol
  • Golang Gorilla/Websocket does a pretty good job of updating the HTTP request protocol, so we can just use it
  • In practice, it is recommended that each connection use two coroutines to process data requests from the client and send data to the client. Although enabling coroutines occupies some memory, read separation reduces the possibility of data congestion
  • init_acc.go
Func wsPage(w http.responsewriter, req * http.request) {// Upgrade protocol conn, err := (& websocket.upgrader {CheckOrigin: func(r *http.Request) bool { fmt.Println("Upgrade Agreement"."ua:", r.Header["User-Agent"]."referer:", r.Header["Referer"])

		return true
	}}).Upgrade(w, req, nil)
	iferr ! = nil { http.NotFound(w, req)return
	}

	fmt.Println("WebSocket establishes connection :", conn.RemoteAddr().String()) currentTime := uint64(time.Now().Unix()) client := NewClient(conn.RemoteAddr().String(), Conn, currentTime) go client.read() go client.write() // User connection events clientManager.register < -client}Copy the code

3.1.3 Client Connection management

  • How many users are currently connected to the program, and also need to broadcast to the user, here we need a manager (clientManager) to handle these events:
  • Record all connections and login users. You can query user connections by appId+ UUID
  • Using map storage involves the problem of concurrent read and write of multiple coroutines, so it is necessary to add read and write locks
  • Four channels are defined to handle client connection, user login, disconnection, and all-member broadcast events respectively
// Connection managementtypeClientManager struct {Clients map[*Client]bool ClientsLock sync.RWMutex // ClientManager struct {Clients map[*Client]bool ClientsLock sync Login user // appId+uuid UserLock sync.RWMutex // read/write lock Register chan *Client // Connection Connection Login chan * Login // User Login Unregister Chan *Client // Disconnect handler Broadcast chan []byte // Broadcast to all members} // initialize func NewClientManager() (clientManager) *ClientManager) { clientManager = &ClientManager{ Clients: make(map[*Client]bool), Users: make(map[string]*Client), Register: make(chan *Client, 1000), Login: make(chan *login, 1000), Unregister: make(chan *Client, 1000), Broadcast: make(chan []byte, 1000), }return
}
Copy the code

3.1.4 Registering the asynchronous handler written by the socket on the client

  • Catch exceptions to prevent program crashes
  • Used here to show the location of the abnormal crashstring(debug.Stack())Prints call stack information
  • If writing data fails, there may be a problem with the connection and the connection is closed
  • client.go
// Write data to Client func (c *Client)write() {
	defer func() {
		ifr := recover(); r ! = nil { fmt.Println("write stop", string(debug.Stack()), r)

		}
	}()

	defer func() {
		clientManager.Unregister <- c
		c.Socket.Close()
		fmt.Println("The Client sends the data defer", c)
	}()

	for {
		select {
		case message, ok := <-c.Send:
			if! Ok {// Send data error close connection FMT.Println("Client sends data to close connection", c.Addr, "ok", ok)

				return
			}

			c.Socket.WriteMessage(websocket.TextMessage, message)
		}
	}
}
Copy the code

3.1.5 Registering an asynchronous handler for client socket reads

  • Loop reads the data sent by the client and processes it
  • If reading data fails, close the channel
  • client.go
Func (c *Client)read() {
	defer func() {
		ifr := recover(); r ! = nil { fmt.Println("write stop", string(debug.Stack()), r)
		}
	}()

	defer func() {
		fmt.Println("Read client data close send", c)
		close(c.Send)
	}()

	for {
		_, message, err := c.Socket.ReadMessage()
		iferr ! = nil { fmt.Println("Error reading client data", c.Addr, err)

			return} // the handler fmt.println ("Reading client data processing :", string(message))
		ProcessData(c, message)
	}
}
Copy the code

3.1.6 Receiving and processing client data

  • Agreed to send and receive request data format, in order to facilitate JS processing, using JSON data format to send and receive data (human readable format is more convenient to use in work development)

  • Example of logging in to send data:

{"seq":"1565336219141-266129"."cmd":"login"."data": {"userId":"Ma Yuan"."appId": 101}}Copy the code
  • Example login response data:
{"seq":"1565336219141-266129"."cmd":"login"."response": {"code": 200,"codeMsg":"Success"."data":null}}
Copy the code
  • Websocket is a two-way data communication, which can be continuously sent. If the sent data needs to be replied by the server, a SEQ is needed to determine which request data the server responds to

  • CMD is used to determine the action. Websocket doesn’t have a URL like HTTP, so what is CMD for

  • The current actions are :login/heartbeat Used to send a login request and keep the connection alive (long connections that have not been sent for a long time are easily disconnected by browsers, mobile middlemen, nginx, and server applications)

  • Why AppId is needed? UserId is the unique field representing the user. In order to make the design universal, AppId is designed to represent the platform on which the user logs in (Web, APP, ios, etc.), so as to facilitate subsequent expansion

  • Request_model. go Specifies the request data format

/ * * * * * * * * * * * * * * * * * * * * * * * * request data * * * * * * * * * * * * * * * * * * * * * * * * * * / / / common request data formattype Request struct {
	Seq  string      `json:"seq"'// Unique Id of message Cmd string' json:"cmd"Data interface{} 'json:"data,omitempty"'// data json} // Login request datatype Login struct {
	ServiceToken string `json:"serviceToken"'// Verify user login AppId uint32' json:"appId,omitempty"`
	UserId       string `json:"userId,omitempty"'} // Heartbeat request datatype HeartBeat struct {
	UserId string `json:"userId,omitempty"`}Copy the code
  • response_model.go
/ * * * * * * * * * * * * * * * * * * * * * * * * response data * * * * * * * * * * * * * * * * * * * * * * * * * * /type Head struct {
	Seq      string    `json:"seq"'// Message Id Cmd string' json:"cmd"'// Message Response *Response' json:"response"'// Message body}type Response struct {
	Code    uint32      `json:"code"`
	CodeMsg string      `json:"codeMsg"`
	Data    interface{} `json:"data"'// data json}Copy the code

3.1.7 Process client request data in routing mode

  • Routing is used to process the request data sent by the client
  • Later, when the request type is added, the class can be handled in a router-controller manner similar to HTTP
  • acc_routers.go
// Websocket routing funcWebsocketInit() {
	websocket.Register("login", websocket.LoginController)
	websocket.Register("heartbeat", websocket.HeartbeatController)
}
Copy the code

3.1.8 Prevent memory overflow and Goroutine non-reclamation

    1. Scheduled task Clear timeout connections Connections that are not logged in or logged in to are disconnected if no heartbeat communication occurs within 6 minutes

client_manager.go

// Periodically clean up connection timeout funcClearTimeoutConnections() {
    currentTime := uint64(time.Now().Unix())

    for client := range clientManager.Clients {
        if client.IsHeartbeatTimeout(currentTime) {
            fmt.Println("Heartbeat time out close connection.", client.Addr, client.UserId, client.LoginTime, client.HeartbeatTime)

            client.Socket.Close()
        }
    }
}
Copy the code
    1. The read-write Goroutine is closed to each other if one failswrite()The Goroutine failed to write datac.Socket.Close()Connection, will closeread()Goroutine read()Goroutine failed to read data. Closedclose(c.Send)Connection, will closewrite()Goroutine
    1. The client actively closes the Goroutine from the read and writeClientManagerDelete the connection
    1. Monitor user connection, Goroutine number ten memory overflow nine and Goroutine related to add an HTTP interface, can view system status, prevent Goroutine does not recycle view system status
    1. Nginx configures the release time for inactive connections to prevent forgetting to close connections
    1. Analyze performance and time using pprof

3.2 Use javaScript to implement webSocket client

3.2.1 Start and register the listener

  • Js establishes the connection and processes the event of successful connection, received data, and disconnection
ws = new WebSocket(Ws: / / "127.0.0.1:8089 / acc." ");

 
ws.onopen = function(evt) {
  console.log("Connection open ...");
};
 
ws.onmessage = function(evt) {
  console.log( "Received Message: " + evt.data);
  data_array = JSON.parse(evt.data);
  console.log( data_array);
};
 
ws.onclose = function(evt) {
  console.log("Connection closed.");
};

Copy the code

3.2.2 Sending Data

  • Note that data can be sent only after the connection is successfully established
  • Example of sending data from the client to the server after the connection is established
Login: ws. Send ('{"seq":"2323","cmd":"login","data":{"userId":"11","appId":101}}'); Heartbeat: ws. Send ('{"seq":"2324","cmd":"heartbeat","data":{}}'); Ping To check whether the service is normal: ws. Send ('{"seq":"2325","cmd":"ping","data":{}}'); Close the connection: ws.close();Copy the code

4. GoWebSocket project

4.1 Project Description

  • This project is a distributed IM system based on webSocket

  • Client randomly assigned user name, all people into a chat room, to achieve the function of group chat

  • A single machine (24 cores 128 GIGABytes of memory) supports millions of client connections

  • Horizontal deployment is supported where deployed machines can communicate with each other

  • Project Architecture Chart

4.2 Project Dependency

  • Only Redis and Golang are required for this project
  • This project uses Govendor to manage dependencies, and cloning this project can be used directly
# The main package used
github.com/gin-gonic/[email protected]
github.com/go-redis/redis
github.com/gorilla/websocket
github.com/spf13/viper
google.golang.org/grpc
github.com/golang/protobuf
Copy the code

4.3 Project Startup

  • Cloning project
git clone [email protected]:link1st/gowebsocket.git
# or
git clone https://github.com/link1st/gowebsocket.git
Copy the code
  • Modifying project Configuration
cd gowebsocket
cd config
mv app.yaml.example app.yaml
# change project listening port, redis connection, etc. (default 127.0.0.1:3306)
vim app.yaml
Return to the project directory in preparation for later startup
cd.Copy the code
  • Configuration File Description
app:
  logFile: log/gin.log # Log file location
  httpPort: 8080 HTTP port #
  webSocketPort: 8089 # webSocket port
  rpcPort: 9001 Distributed deployer internal communication portHttpUrl: 127.0.0.1:8080 webSocketUrl: 127.0.0.1:8089 redis: addr:"localhost:6379"
  password: ""
  DB: 0
  poolSize: 30
  minIdleConns: 30
Copy the code
  • Start the project
go run main.go
Copy the code
  • Enter the IM chat at http://127.0.0.1:8080/home/index
  • At this point, you can experience websocket-based IM systems

5, webSocket project Nginx configuration

5.1 Why Do I Configure Nginx

  • The use of NGINx to achieve the separation of the internal and external networks, external exposure only NGINx Ip(general Internet enterprises will add a layer of LVS before nGINx load balancing), reduce the possibility of intrusion
  • Using Nginx can take advantage of the load function of Nginx. The front-end only needs to connect to fixed domain names and distribute traffic to different machines through Nginx
  • We can also use different load policies of Nginx (polling, weight, ip_hash)

5.2 nginx configuration

  • The domain name im.91vh.com is used as an example
  • The primary directory im.91vh.com/acc is for webSocket use, which uses nginx Stream forwarding (nginx 1.3.31, same with Tengine configuration), and is forwarded to Golang 8089 port for processing
  • The other directories are for HTTP and are forwarded to golang port 8080 for processing
Upstream go-im {server 127.0.0.1:8080weight =1 max_fails=2 fail_timeout=10s; keepalive 16; } upstream go-acc {server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; } server { listen 80 ; server_name im.91vh.com; index index.html index.htm ; location /acc { proxy_set_header Host$host;
        proxy_pass http://go-acc;
        proxy_http_version 1.1;
        proxy_set_header Upgrade $http_upgrade;
        proxy_set_header Connection $connection_upgrade;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 10m;
    }

    location /
    {
        proxy_set_header Host $host;
        proxy_pass http://go-im;
        proxy_http_version 1.1;
        proxy_set_header Connection "";
        proxy_redirect off;
        proxy_intercept_errors on;
        client_max_body_size 30m;
    }

    access_log  /link/log/nginx/access/im.log;
    error_log   /link/log/nginx/access/im.error.log;
}
Copy the code

5.3 Troubleshooting

  • Run the nginx test command to check whether the configuration file is correct
/link/server/tengine/sbin/nginx -t

Copy the code
  • If something goes wrong
nginx: [emerg] unknown "connection_upgrade" variable
configuration file /link/server/tengine/conf/nginx.conf test failed
Copy the code
  • Processing method
  • Add in nginx.com
http{ fastcgi_temp_file_write_size 128k; .# Content to add

    #support websocket
    map $http_upgrade $connection_upgrade {
        default upgrade;
        ' 'close; }... gzip on; }Copy the code
  • Cause :Nginx proxy webSocket will encounter Nginx design problem End-to- End and hop-by-hop Headers

6, pressure measurement

6.1 Linux Kernel Optimization

  • Set the number of file opening handles
ulimit -n 1000000
Copy the code
  • Set Sockets connection parameters
vim /etc/sysctl.conf
net.ipv4.tcp_tw_reuse = 1
net.ipv4.tcp_tw_recycle = 0
Copy the code

6.2 Preparation for pressure test

  • Wait for pressure test, if you have pressure test results welcome to add
  • This will be followed by a special tutorial on applying machines, writing pressure test cases, kernel optimization, and obtaining pressure test data

6.3 Pressure measurement Data

  • In actual use, each connection takes up about 24Kb of memory, and one Goroutine takes up about 11KB
  • Supporting megabytes of connections requires 22GB of memory
Number of Online Users cup memory I/O net.out
1W
10W
100W

7, how to implement a distributed Im based on webSocket

7.1 illustrates

  • Refer to the source code of this project

  • Gowebsocket V1.0.0 standalone Im system

  • Gowebsocket V2.0.0 distributed Im system

  • For demonstration purposes, the IM system and webSocket(ACC) system are combined in one system

  • IM system interface: obtain all online users, query all users of the single service + all users of the service in the cluster to send messages. Here, HTTP interface is used to send messages (wechat web version also uses HTTP interface to send messages). Here, two main points are considered: 1. Service separation, make ACC system as simple as possible, do not mix other business logic 2. Messages are sent through HTTP interfaces instead of webSocket connections. Data is sent and received in a separate way, which speeds up the efficiency of sending and receiving data

7.2 architecture

  • Project startup registration and user connection sequence diagram

  • Other systems (IM, task) send message timing diagrams to users connected to the webSocket(ACC) system

7.3 Distributed System Deployment

  • Demonstrate split deployment with horizontal deployment of two projects (GOWebSocket and GoWebSocket1)
  • How to communicate with each other among projects: after the project is started, register the project Ip and rpcPort in Redis, so that other projects can discover it and use gRpc to communicate when they need to communicate
  • gowebsocket
Yaml configuration file information
app:
  logFile: log/gin.log
  httpPort: 8080
  webSocketPort: 8089
  rpcPort: 9001
  httpUrl: im.91vh.com
  webSocketUrl:  im.91vh.com

# Starting the project
go run main.go 

Copy the code
  • gowebsocket1
Make a copy of the first item
cp -rf gowebsocket gowebsocket1
Yaml modifies the configuration file
app:
  logFile: log/gin.log
  httpPort: 8081
  webSocketPort: 8090
  rpcPort: 9002
  httpUrl: im.91vh.com
  webSocketUrl:  im.91vh.com

# Starting the second project
go run main.go 
Copy the code
  • Nginx configuration

Add the Ip and port of the second machine to the previous Nginx configuration item

Upstream go-im {server 127.0.0.1:8080weight =1 max_fails=2 fail_timeout=10s; Server 127.0.0.1:8081 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; } upstream go-acc {server 127.0.0.1:8089 weight=1 max_fails=2 fail_timeout=10s; Server 127.0.0.1:8090 weight=1 max_fails=2 fail_timeout=10s; keepalive 16; }Copy the code
  • Restart Nginx after the configuration is complete
  • After the restart request, verify that the expected:

See if requests fall on two items and test whether two users connected to different items (goWebSocket and goWebSocket1) can also send messages to each other

  • About Distributed Deployment

This project only demonstrates how the project is distributed deployment, and after the distributed deployment module how to communicate with each other to completely solve the system without a single point of failure, also need Nginx cluster, Redis Cluster and so on

8. Review and reflect

8.1 Application in other systems

  • The original intention of the system design is to maintain a long connection with the client, two interfaces to the external system (query whether the user is online, push messages to the online user), to achieve the separation of business
  • Multiple services can be used only when they are separated from each other, rather than establishing a long connection for each service

8.2 Implemented Functions

  • Gin Log (request log + Debug log)
  • Reading the configuration file is complete
  • Timed script to clean up expired unheartbeat connections
  • HTTP interface. Obtaining the login and connection numbers is complete
  • HTTP interface, send push, query how many people complete online
  • GRPC program internal communication, sending messages completed
  • AppIds a user logs in on multiple platforms
  • Interface, pull all online people into a group inside, send a message to complete
  • Single chatGroup chat completed
  • Achieve distributed, horizontal expansion complete
  • Pressure test script
  • Document finishing
  • Document directory, the implementation of mega long connection, why to implement an IM, how to implement an IM
  • Architecture diagrams and extensions

IM implementation details:

  • Defining the text message structure is complete
  • HTML sending a text message is complete
  • The interface receives a text message and sends it to all completers
  • HTML receives the message and displays it to the interface
  • Interface optimization requires continuous optimization
  • When someone joins in, the broadcast is complete
  • Defining the message structure for joining the chat room is complete
  • Introduction of robots TBD

8.2 Need to be improved and optimized

  • Login, use wechat login to obtain a nickname, profile picture, etc
  • There are account system, data system
  • Interface optimization, mobile phone adaptation
  • Message Text message (emoticons), picture, voice, and video message
  • Micro-service registration, discovery, fusing, etc
  • Add configuration item, maximum number of connections per machine

8.3 summarize

  • Although the implementation of a distributed IM chat, but a lot of details are not dealt with (login without authentication, the interface is still to be optimized, etc.), but you can see through this example: through WebSocket to solve a lot of business requirements
  • Although this article claims that a single machine can have millions of long connections (memory can meet), but the actual scene is far more complex than this (some CPU pressure), of course, if you have such a large business volume can buy more machines to better support your business, this program only demonstrates how to use webSocket in the actual work.
  • With this article, you can implement a program that meets your needs

9. References

Wikipedia WebSocket

Ruan Yifeng WebSocket tutorial

WebSocket protocol: 5 minutes from beginner to master

link1st gowebsocket

Github search :link1st to view project gowebSocket

Github.com/link1st/gow…