Go network programming (I)–TCP connection communication

TCP Protocol Overview

In the network layer 7 protocol, we know that TCP is below the HTTP layer, and that HTTP connections are essentially established based on the underlying TCP connections. TCP connection identifier: The establishment of a network connection between computers, also known as the handshake, is essentially the association of two file handles, namely FD. Each network connection is uniquely identified by four attributes: < source IP address, source port, destination IP address, destination port >. Therefore, the number of connections of a machine is limited by file handle Ulimit.

Long connection contrast of KeepAlive

  • HTTP keepalive

As we all know, HTTP connections are stateless and are usually destroyed when they are used up. Keepalive can be enabled to tell them to keep the connection open for a period of time and avoid frequent connection re-establishment.

  • TCP keepalive

    Many existing TCP protocols support this way of error handling by defining some sort of heartbeat mechanism that requires each endpoint to send PING/PONG probes at a regular interval in order to detect both networking problems, as well as service health.

    Unlike HTTP, TCP is designed for long connections. Keepalive is used for activity detection, which can be interpreted as supporting error handling by defining some type of heartbeat mechanism that requires each endpoint to send ping/pong probes at regular intervals to detect network problems and service health.

Linux Network Parameters

The TCP keepalive mechanism can be set on a Linux machine using the following network parameters:

  # cat /proc/sys/net/ipv4/tcp_keepalive_time
  7200

  # cat /proc/sys/net/ipv4/tcp_keepalive_intvl
  75

  # cat /proc/sys/net/ipv4/tcp_keepalive_probes
  9
Copy the code

The above is the default setting, which means re-sending every 75 seconds after two hours (7200 seconds) after the initial connection is created. If no ACK response is received for nine consecutive times, the connection is marked as broken.

Go API is introduced

In the Go native NET package, there are the following functions that can interfere with the keepalive mechanism of TCP connections:

  • func (c *TCPConn) SetKeepAlive(keepalive bool) error

    Whether to enable connection detection
  • func (c *TCPConn) SetKeepAlivePeriod(d time.Duration) error

    If this parameter is not set, the operating system parameter is used by default

Use case Demo

Let’s start with a TCP connection demo, and then we’ll use connection pooling to centrally manage TCP connections.

Transmission structure

Simply define two structures for client and server interaction, transport protocol using JSON demonstration

type Message struct {
	Uid string
	Val string
}

type Resp struct {
	Uid string
	Val string
	Ts string
}
Copy the code

The server side

const TAG = "server: hello, "

func transfer(conn net.Conn) {
	defer func(a) {
		remoteAddr := conn.RemoteAddr().String()
		log.Print("discard remove add:", remoteAddr)
		conn.Close()
	}()

	// Set 10 seconds to close the connection
	//conn.SetDeadline(time.Now().Add(10 * time.Second))

	for {
		var msg body.Message

		iferr := json.NewDecoder(conn).Decode(&msg); err ! =nil&& err ! = io.EOF { log.Printf("Decode from client err: %v", err)
			// todo... Writes the err prefix '-' to notify the client of error handling
			return
		}

		ifmsg.Uid ! =""|| msg.Val ! ="" {
			//conn.Write([]byte(msg.Val))
			var rsp body.Resp
			rsp.Uid = msg.Uid
			rsp.Val = TAG + msg.Val
			ser, _ := json.Marshal(msg)

			conn.Write(append(ser, '\n'))}}}func ListenAndServer(a) {
	log.Print("Start server...")
	// Enable listening on local TCP port 3000
	listen, err := net.Listen("tcp"."0.0.0.0:3000")
	iferr ! =nil {
		log.Fatal("Listen failed. msg: ", err)
		return
	}
	for {
		conn, err := listen.Accept()
		iferr ! =nil {
			log.Printf("accept failed, err: %v", err)
			continue
		}
		go transfer(conn)
	}
}
Copy the code

The client side

Defines a Conn connection type that wraps native TCP and other additional properties, including context, result channel, and so on.

type IConn interface {
	Close() error
}

// Conn corresponds to each connection
type Conn struct {
	addr    string              / / address
	tcp     *net.TCPConn        // TCP connection instance, which can be any other type
	ctx     context.Context
	writer  *bufio.Writer
	cnlFun  context.CancelFunc // Used to notify CTX of termination
	retChan *sync.Map          // The map that stores the channel result set belongs to the unified connection
	err     error
}

// Implement Close() for Conn to sign the connection and Close the message channel
func (c *Conn) Close(a) (err error) {
	// Execute the cleanup
	ifc.cnlFun ! =nil {
		c.cnlFun()
	}

	// Close the TCP connection
	ifc.tcp ! =nil {
		err = c.tcp.Close()
	}

	// Close the message channel
	ifc.retChan ! =nil {
		c.retChan.Range(func(key, value interface{}) bool {
			// Convert channel types based on specific business assertions
			if ch, ok := value.(chan string); ok {
				close(ch)
			}
			return true})}return
}
Copy the code

Defines the option structure of the configuration item for the connection

type Option struct {
	addr        string
	size        int
	readTimeout time.Duration
	dialTimeout time.Duration
	keepAlive   time.Duration
}
Copy the code

Then create the connection code as follows:

func NewConn(opt *Option) (c *Conn, err error) {
	// Initialize the connection
	c = &Conn{
		addr:    opt.addr,
		retChan: new(sync.Map),
		//err: nil,
	}

	defer func(a) {
		iferr ! =nil {
			ifc ! =nil {
				c.Close()
			}
		}
	}()

	/ / dial
	var conn net.Conn
	if conn, err = net.DialTimeout("tcp", opt.addr, opt.dialTimeout); err ! =nil {
		return
	} else {
		c.tcp = conn.(*net.TCPConn)
	}

	c.writer = bufio.NewWriter(c.tcp)

	//if err = c.tcp.SetKeepAlive(true); err ! = nil {
	if err = c.tcp.SetKeepAlive(false); err ! =nil {
		return
	}
	iferr = c.tcp.SetKeepAlivePeriod(opt.keepAlive); err ! =nil {
		return
	}
	if err = c.tcp.SetLinger(0); err ! =nil {
		return
	}

    // Create context management
	c.ctx, c.cnlFun = context.WithCancel(context.Background())

	// Asynchronously receive the result to the corresponding result set
	go receiveResp(c)

	return
}
Copy the code

Receiving results asynchronously

The receiveResp() function, which performs asynchronous polling, has several functions:

  • Aware that the context is closed, usually the connection’s cancel() is performed
  • Receives data from the server and writes it to the result channelretChan, whose type is concurrency safesync.Map
  • Listen for server errors and close connections for exceptions
// receiveResp Receives TCP connection data
func receiveResp(c *Conn) {
	scanner := bufio.NewScanner(c.tcp)
	for {
		select {
		case <-c.ctx.Done():
			// c.nlfun () is executed, such as connection pool closed
			return
		default:
			if scanner.Scan() {
				// Read data
				rsp := new(body.Resp)
				iferr := json.Unmarshal(scanner.Bytes(), rsp); err ! =nil {
					return
				}
				// Response ID corresponds to request ID
				uid := rsp.Uid
				if load, ok := c.retChan.Load(uid); ok {
					c.retChan.Delete(uid)
					// Message channel
					if ch, ok := load.(chan string); ok {
						ch <- rsp.Ts + ":" + rsp.Val
						// Close on the write side
						close(ch)
					}
				}
			} else {
				// Error merging EOF
				ifscanner.Err() ! =nil {
					c.err = scanner.Err()
				} else {
					c.err = errors.New("scanner done")
				}
				c.Close()
				return}}}}Copy the code

Send the request

If the MSG message body of the input parameter is of the type interface{}, it is better to verify the type assertion based on the service to avoid parsing errors on the server. Therefore, err is returned for determining whether to return the connection pool. * /
func (c *Conn) Send(ctx context.Context, msg *body.Message) (ch chan string, err error) {
	ch = make(chan string)
	c.retChan.Store(msg.Uid, ch)
	/ / request
	js, _ := json.Marshal(msg)

	_, err = c.writer.Write(js)
	iferr ! =nil {
		return
	}

	err = c.writer.Flush()
	// The connection is not closed and can be added to the connection pool later
	//c.tcp.CloseWrite()
	return
}
Copy the code

Example:

  1. Start server listening:
=== RUN   TestListenAndServer
2021/05/10 16:58:20 Start server...
Copy the code
  1. Initiate a request:
var OPT = &Option{
	addr:        "0.0.0.0:3000",
	size:        3,
	readTimeout: 3 * time.Second,
	dialTimeout: 3 * time.Second,
	keepAlive:   1 * time.Second,
}

func createConn(opt *Option) *Conn {
	c, err := NewConn(opt)
	iferr ! =nil {
		panic(err)
	}
	return c
}

func TestSendMsg(t *testing.T) {
	c := createConn(OPT)
	msg := &body.Message{Uid: "pixel-1", Val: "pixelpig!"}
	rec, err := c.Send(context.Background(), msg)
	iferr ! =nil {
		t.Error(err)
	} else {
		t.Logf("rec1: %+v", <-rec)
	}

	msg.Val = "another pig!"
	rec2, err := c.Send(context.Background(), msg)
	iferr ! =nil {
		t.Error(err)
	} else {
		t.Logf("rec2: %+v", <-rec2)
	}
	t.Log("finished")}Copy the code
  1. The client output is as follows:
=== RUN TestSendMsg TestSendMsg: conn_test.go:56: rec1: : pixelpig! TestSendMsg: conn_test.go:64: rec2: : another pig! TestSendMsg: conn_test.go:66: finished - PASS: TestSendMsg (9.94s) PASSCopy the code

Timeout and pooling management

This is a simple point-to-point interaction, but you can also consider the connection interaction timeout case:

  1. Although the result of the connection is an asynchronous response, it is necessary to timeout the response to prevent persistent blocking of a single connection
  2. We need to consider reuse, that is, putting healthy connections into a connection pool for management.

Timeout judgment

There are many practices in the industry for determining timeout.A common one is to use a select{} block with time.after (). Let’s look at common implementations:

rec3, err := c.Send(context.Background(), msg)
if err == nil {
	select {
	case resp := <-rec3:
		t.Logf("rec3: %+v", resp)
		return
	case <-time.After(time.Second * 1):
		t.Error("Wait for resp timeout!")
		return}}else {
	t.Error(err)
}
Copy the code

The timeout output is as follows:

=== RUN TestSendMsg TestSendMsg: conn_test.go:56: rec1: : pixelpig! TestSendMsg: conn_test.go:76: Wait for resp timeout! -- FAIL: TestSendMsg (17.99s) FAILCopy the code

Connection Pool Management

Here is a slightly more complicated situation to consider, you can first list the difficulties and then break them one by one:

  1. Upper limit on the number of connections in the pool
  2. The number of idle connections is updated
  3. Connect fetch and return
  4. Connection is closed

Pooling operations can be a lengthy topic, which will be explained in the next article in this series, “Talking about Go Network Programming –TCP Connection Management (PART 2).”

Refer to the link

Notes on TCP keepalive in Go thenotexpert.com/golang-tcp-… Using TCP keepalive under Linux tldp.org/HOWTO/TCP-K… Graphic network – Kobayashi Coding