Go network programming (I)–TCP connection communication
TCP Protocol Overview
In the network layer 7 protocol, we know that TCP is below the HTTP layer, and that HTTP connections are essentially established based on the underlying TCP connections. TCP connection identifier: The establishment of a network connection between computers, also known as the handshake, is essentially the association of two file handles, namely FD. Each network connection is uniquely identified by four attributes: < source IP address, source port, destination IP address, destination port >. Therefore, the number of connections of a machine is limited by file handle Ulimit.
Long connection contrast of KeepAlive
- HTTP keepalive
As we all know, HTTP connections are stateless and are usually destroyed when they are used up. Keepalive can be enabled to tell them to keep the connection open for a period of time and avoid frequent connection re-establishment.
-
TCP keepalive
Many existing TCP protocols support this way of error handling by defining some sort of heartbeat mechanism that requires each endpoint to send PING/PONG probes at a regular interval in order to detect both networking problems, as well as service health.
Unlike HTTP, TCP is designed for long connections. Keepalive is used for activity detection, which can be interpreted as supporting error handling by defining some type of heartbeat mechanism that requires each endpoint to send ping/pong probes at regular intervals to detect network problems and service health.
Linux Network Parameters
The TCP keepalive mechanism can be set on a Linux machine using the following network parameters:
# cat /proc/sys/net/ipv4/tcp_keepalive_time
7200
# cat /proc/sys/net/ipv4/tcp_keepalive_intvl
75
# cat /proc/sys/net/ipv4/tcp_keepalive_probes
9
Copy the code
The above is the default setting, which means re-sending every 75 seconds after two hours (7200 seconds) after the initial connection is created. If no ACK response is received for nine consecutive times, the connection is marked as broken.
Go API is introduced
In the Go native NET package, there are the following functions that can interfere with the keepalive mechanism of TCP connections:
func (c *TCPConn) SetKeepAlive(keepalive bool) error
Whether to enable connection detectionfunc (c *TCPConn) SetKeepAlivePeriod(d time.Duration) error
If this parameter is not set, the operating system parameter is used by default
Use case Demo
Let’s start with a TCP connection demo, and then we’ll use connection pooling to centrally manage TCP connections.
Transmission structure
Simply define two structures for client and server interaction, transport protocol using JSON demonstration
type Message struct {
Uid string
Val string
}
type Resp struct {
Uid string
Val string
Ts string
}
Copy the code
The server side
const TAG = "server: hello, "
func transfer(conn net.Conn) {
defer func(a) {
remoteAddr := conn.RemoteAddr().String()
log.Print("discard remove add:", remoteAddr)
conn.Close()
}()
// Set 10 seconds to close the connection
//conn.SetDeadline(time.Now().Add(10 * time.Second))
for {
var msg body.Message
iferr := json.NewDecoder(conn).Decode(&msg); err ! =nil&& err ! = io.EOF { log.Printf("Decode from client err: %v", err)
// todo... Writes the err prefix '-' to notify the client of error handling
return
}
ifmsg.Uid ! =""|| msg.Val ! ="" {
//conn.Write([]byte(msg.Val))
var rsp body.Resp
rsp.Uid = msg.Uid
rsp.Val = TAG + msg.Val
ser, _ := json.Marshal(msg)
conn.Write(append(ser, '\n'))}}}func ListenAndServer(a) {
log.Print("Start server...")
// Enable listening on local TCP port 3000
listen, err := net.Listen("tcp"."0.0.0.0:3000")
iferr ! =nil {
log.Fatal("Listen failed. msg: ", err)
return
}
for {
conn, err := listen.Accept()
iferr ! =nil {
log.Printf("accept failed, err: %v", err)
continue
}
go transfer(conn)
}
}
Copy the code
The client side
Defines a Conn connection type that wraps native TCP and other additional properties, including context, result channel, and so on.
type IConn interface {
Close() error
}
// Conn corresponds to each connection
type Conn struct {
addr string / / address
tcp *net.TCPConn // TCP connection instance, which can be any other type
ctx context.Context
writer *bufio.Writer
cnlFun context.CancelFunc // Used to notify CTX of termination
retChan *sync.Map // The map that stores the channel result set belongs to the unified connection
err error
}
// Implement Close() for Conn to sign the connection and Close the message channel
func (c *Conn) Close(a) (err error) {
// Execute the cleanup
ifc.cnlFun ! =nil {
c.cnlFun()
}
// Close the TCP connection
ifc.tcp ! =nil {
err = c.tcp.Close()
}
// Close the message channel
ifc.retChan ! =nil {
c.retChan.Range(func(key, value interface{}) bool {
// Convert channel types based on specific business assertions
if ch, ok := value.(chan string); ok {
close(ch)
}
return true})}return
}
Copy the code
Defines the option structure of the configuration item for the connection
type Option struct {
addr string
size int
readTimeout time.Duration
dialTimeout time.Duration
keepAlive time.Duration
}
Copy the code
Then create the connection code as follows:
func NewConn(opt *Option) (c *Conn, err error) {
// Initialize the connection
c = &Conn{
addr: opt.addr,
retChan: new(sync.Map),
//err: nil,
}
defer func(a) {
iferr ! =nil {
ifc ! =nil {
c.Close()
}
}
}()
/ / dial
var conn net.Conn
if conn, err = net.DialTimeout("tcp", opt.addr, opt.dialTimeout); err ! =nil {
return
} else {
c.tcp = conn.(*net.TCPConn)
}
c.writer = bufio.NewWriter(c.tcp)
//if err = c.tcp.SetKeepAlive(true); err ! = nil {
if err = c.tcp.SetKeepAlive(false); err ! =nil {
return
}
iferr = c.tcp.SetKeepAlivePeriod(opt.keepAlive); err ! =nil {
return
}
if err = c.tcp.SetLinger(0); err ! =nil {
return
}
// Create context management
c.ctx, c.cnlFun = context.WithCancel(context.Background())
// Asynchronously receive the result to the corresponding result set
go receiveResp(c)
return
}
Copy the code
Receiving results asynchronously
The receiveResp() function, which performs asynchronous polling, has several functions:
- Aware that the context is closed, usually the connection’s cancel() is performed
- Receives data from the server and writes it to the result channel
retChan
, whose type is concurrency safesync.Map
- Listen for server errors and close connections for exceptions
// receiveResp Receives TCP connection data
func receiveResp(c *Conn) {
scanner := bufio.NewScanner(c.tcp)
for {
select {
case <-c.ctx.Done():
// c.nlfun () is executed, such as connection pool closed
return
default:
if scanner.Scan() {
// Read data
rsp := new(body.Resp)
iferr := json.Unmarshal(scanner.Bytes(), rsp); err ! =nil {
return
}
// Response ID corresponds to request ID
uid := rsp.Uid
if load, ok := c.retChan.Load(uid); ok {
c.retChan.Delete(uid)
// Message channel
if ch, ok := load.(chan string); ok {
ch <- rsp.Ts + ":" + rsp.Val
// Close on the write side
close(ch)
}
}
} else {
// Error merging EOF
ifscanner.Err() ! =nil {
c.err = scanner.Err()
} else {
c.err = errors.New("scanner done")
}
c.Close()
return}}}}Copy the code
Send the request
If the MSG message body of the input parameter is of the type interface{}, it is better to verify the type assertion based on the service to avoid parsing errors on the server. Therefore, err is returned for determining whether to return the connection pool. * /
func (c *Conn) Send(ctx context.Context, msg *body.Message) (ch chan string, err error) {
ch = make(chan string)
c.retChan.Store(msg.Uid, ch)
/ / request
js, _ := json.Marshal(msg)
_, err = c.writer.Write(js)
iferr ! =nil {
return
}
err = c.writer.Flush()
// The connection is not closed and can be added to the connection pool later
//c.tcp.CloseWrite()
return
}
Copy the code
Example:
- Start server listening:
=== RUN TestListenAndServer
2021/05/10 16:58:20 Start server...
Copy the code
- Initiate a request:
var OPT = &Option{
addr: "0.0.0.0:3000",
size: 3,
readTimeout: 3 * time.Second,
dialTimeout: 3 * time.Second,
keepAlive: 1 * time.Second,
}
func createConn(opt *Option) *Conn {
c, err := NewConn(opt)
iferr ! =nil {
panic(err)
}
return c
}
func TestSendMsg(t *testing.T) {
c := createConn(OPT)
msg := &body.Message{Uid: "pixel-1", Val: "pixelpig!"}
rec, err := c.Send(context.Background(), msg)
iferr ! =nil {
t.Error(err)
} else {
t.Logf("rec1: %+v", <-rec)
}
msg.Val = "another pig!"
rec2, err := c.Send(context.Background(), msg)
iferr ! =nil {
t.Error(err)
} else {
t.Logf("rec2: %+v", <-rec2)
}
t.Log("finished")}Copy the code
- The client output is as follows:
=== RUN TestSendMsg TestSendMsg: conn_test.go:56: rec1: : pixelpig! TestSendMsg: conn_test.go:64: rec2: : another pig! TestSendMsg: conn_test.go:66: finished - PASS: TestSendMsg (9.94s) PASSCopy the code
Timeout and pooling management
This is a simple point-to-point interaction, but you can also consider the connection interaction timeout case:
- Although the result of the connection is an asynchronous response, it is necessary to timeout the response to prevent persistent blocking of a single connection
- We need to consider reuse, that is, putting healthy connections into a connection pool for management.
Timeout judgment
There are many practices in the industry for determining timeout.A common one is to use a select{} block with time.after (). Let’s look at common implementations:
rec3, err := c.Send(context.Background(), msg)
if err == nil {
select {
case resp := <-rec3:
t.Logf("rec3: %+v", resp)
return
case <-time.After(time.Second * 1):
t.Error("Wait for resp timeout!")
return}}else {
t.Error(err)
}
Copy the code
The timeout output is as follows:
=== RUN TestSendMsg TestSendMsg: conn_test.go:56: rec1: : pixelpig! TestSendMsg: conn_test.go:76: Wait for resp timeout! -- FAIL: TestSendMsg (17.99s) FAILCopy the code
Connection Pool Management
Here is a slightly more complicated situation to consider, you can first list the difficulties and then break them one by one:
- Upper limit on the number of connections in the pool
- The number of idle connections is updated
- Connect fetch and return
- Connection is closed
Pooling operations can be a lengthy topic, which will be explained in the next article in this series, “Talking about Go Network Programming –TCP Connection Management (PART 2).”
Refer to the link
Notes on TCP keepalive in Go thenotexpert.com/golang-tcp-… Using TCP keepalive under Linux tldp.org/HOWTO/TCP-K… Graphic network – Kobayashi Coding