RPC is a remote Procedure call used to communicate between nodes in a distributed system. This article refers to the Go language advanced programming, the book has carried on a more detailed demonstration.

Let’s start with the Go native RPC.

RPC

In the communication between nodes, one node is the provider of the service, which is called the server, and the other node is the caller of the service, which is called the client. Let’s use an example of printing “Hello World” to illustrate how native RPC is used.

Hello

Here is the file directory structure for this example:

.├ ── │ ├─ ├─ │ ├─ │ ├─ │ ├─ │ ├─ │Copy the code

The service side

The server does several things:

  1. Implement the method provided by the service, in this case returning “Hello World”
  2. Start the server
  3. Bind the service to the server and register the service
  4. Listen for and service connections

main.go

// HelloService defines the service structure, which is currently empty
type HelloService struct{}

// The method implementation provided by the Hello service
// Request is a request string
// Reply is the returned string pointer
func (h *HelloService) Hello(request string, reply *string) error {
	*reply = "Hello:" + request
	return nil
}

func main(a) {
	// Register the service, the first parameter is the service name, the second is the corresponding structure of the service
	rpc.RegisterName("HelloService".new(HelloService))

	lis, err := net.Listen("tcp".": 1234")
	iferr ! =nil {
		log.Fatal("ListenTCP error:", err)
	}

	conn, err := lis.Accept()
	iferr ! =nil {
		log.Fatal("Accept error:", err)
	}
	// Service the corresponding connection
	rpc.ServeConn(conn)
}
Copy the code

The client

What the client needs to do:

  1. Establish a connection with the server, such as a TCP connection
  2. The calling server provides methods in the service
  3. Output return value

main.go

func main() {// Set up a connection to client, err := rpc.Dial()"tcp"."localhost:1234")
	iferr ! = nil { log.Fatal("dialing:", err)} var reply string // method call // First argument: service name. Method name // Request parameter: parameter sent to method // Reply parameter: string returned in response err = client.call ("HelloService.Hello"."world", &reply)
	iferr ! = nil { log.Fatal(err) } fmt.Println(reply) }Copy the code

The results

perfect

The above code looks very simple, but it is not conducive to team communication and collaboration, including late maintenance. So we need to develop a set of specifications (interfaces) to complete them. First determine the service name:

const HelloServiceName = "HelloService"
Copy the code

Step 2 Define the interface:

type HelloServiceInterface interface {
    Hello(reply string, reply *string)error
}
Copy the code

We need to put this code on both the server side and the client side and then we need to further encapsulate the server side and the client side respectively

The service side

The server side is mainly a further encapsulation of registered methods

func RegisterHelloService(svc HelloServiceInterface) error {
    return rpc.RegisterName(HelloServiceName, svc)
}
Copy the code

The client

Define a structure

type HelloServiceClient struct {
    *rpc.Client
}
Copy the code
  1. Encapsulate established connections
func DialHelloService(network, address string) (*HelloServiceClient, error) {
    c, err := rpc.Dial(network, address)
    iferr ! =nil {
        return nil, err
    }
    return &HelloServiceClient{Client: c}, nil
}
Copy the code
  1. Encapsulate the call
func (p *HelloServiceClient) Hello(request string, reply *string) error {
    return p.Client.Call(HelloServiceName+".Hello", request, reply)
}
Copy the code

The complete code

The service side

package main

import (
	"log"
	"net"
	"net/rpc"
)

// HelloServiceName Service name
const HelloServiceName = "HelloService"

// HelloServiceInterface Interface of The HelloService
type HelloServiceInterface interface {
	Hello(request string, reply *string) error
}

// HelloService defines the service structure, which is currently empty
type HelloService struct{}

// The method implementation provided by the Hello service
// Request is a request string
// Reply is the returned string pointer
func (h *HelloService) Hello(request string, reply *string) error {
	*reply = "Hello:" + request
	return nil
}

func RegisterHelloService(svc HelloServiceInterface) error {
	return rpc.RegisterName(HelloServiceName, svc)
}

func main(a) {
	// Register the service, the first parameter is the service name, the second is the corresponding structure of the service
	// rpc.RegisterName("HelloService", new(HelloService))
	RegisterHelloService(new(HelloService))

	lis, err := net.Listen("tcp".": 1234")
	iferr ! =nil {
		log.Fatal("ListenTCP error:", err)
	}

	conn, err := lis.Accept()
	iferr ! =nil {
		log.Fatal("Accept error:", err)
	}
	// Service the corresponding connection
	rpc.ServeConn(conn)
}
Copy the code

The client

package main

import (
	"fmt"
	"log"
	"net/rpc"
)

// HelloServiceName Service name
const HelloServiceName = "HelloService"

// HelloServiceInterface Interface of The HelloService
type HelloServiceInterface interface {
	Hello(request string, reply *string) error
}

// HelloServiceClient encapsulates client
type HelloServiceClient struct {
    *rpc.Client
}

// Check whether it is an interface type
var _ HelloServiceInterface = (*HelloServiceClient)(nil)

// DialHelloService establishes connection encapsulation
func DialHelloService(network, address string) (*HelloServiceClient, error) {
	c, err := rpc.Dial(network, address)
	iferr ! =nil {
		return nil, err
	}
	return &HelloServiceClient{Client: c}, nil
}

// Encapsulate the Hello function call
func (p *HelloServiceClient) Hello(request string, reply *string) error {
	return p.Client.Call(HelloServiceName+".Hello", request, reply)
}

func main(a) {
	// Establish a connection
	// client, err := rpc.Dial("tcp", "localhost:1234")
	client, err := DialHelloService("tcp"."localhost:1234")
	iferr ! =nil {
		log.Fatal("dialing:", err)
	}
	var reply string
	// method call
	// The first parameter: the service name. The method name
	// Request parameters: parameters sent to the method
	// Reply argument: the string that comes back in the response
	// err = client.Call("HelloService.Hello", "world", &reply)
	err = client.Hello("world", &reply)
	iferr ! =nil {
		log.Fatal(err)
	}

	fmt.Println(reply)
}
Copy the code

RPC principle

This section is mainly about the source code of RPC

Client Principles

/ / Call the source code
func (client *Client) Call(
    serviceMethod string, args interface{},
    reply interface{},) error {
    call := <-client.Go(serviceMethod, args, reply, make(chan *Call, 1)).Done
    return call.Error
}
Copy the code

First, an asynchronous Call is made through the client.go method, which returns a Call structure representing the Call. Then wait for the Call structure’s Done pipe to return the Call result.

/ / Go to the source code
func (client *Client) Go(
    serviceMethod string, args interface{},
    reply interface{},
    done chan *Call,
) *Call {
    call := new(Call)
    call.ServiceMethod = serviceMethod
    call.Args = args
    call.Reply = reply
    call.Done = make(chan *Call, 10) // buffered.

    client.send(call)
    return call
}
Copy the code

The main task of Go() is to construct a Call structure and return it, and then Call client.send(Call) to send the Call to the server. When the result of the call returns, the call.done() method is called when it is finished:

func (call *Call) done(a) {
    select {
    case call.Done <- call:
        // ok
    default:
        // We don't want to block here. It is the caller's responsibility to make
        // sure the channel has enough buffer space. See comment in Go().}}Copy the code

By reading the source code above, we can have another way to do the work of client.call () on the client side

func doClientWork(client *rpc.Client) {
    helloCall := client.Go("HelloService.Hello"."hello".new(string), nil)

    // do some thing

    helloCall = <-helloCall.Done
    iferr := helloCall.Error; err ! =nil {
        log.Fatal(err)
    }

    args := helloCall.Args.(string)
    reply := helloCall.Reply.(string)
    fmt.Println(args, reply)
}
Copy the code

Server Principles

//TODO

Memory KV database

This is the second example, implementing a memory-based KV database, which is used to demonstrate implementing the Watch function based on RPC. File directory:

.├ ── │ ├─ ├─ │ ├─ │ ├─ │ ├─ │ ├─ │Copy the code

The service side

Step 1: Define the service architecture. M is used to store data, and filter is a list of filter functions defined during each Watch call

// KVStoreService Memory KV storage service
type KVStoreService struct {
	m      map[string]string
	filter map[string]func(key string)
	mu     sync.Mutex
}
Copy the code

Get and Set methods

// Get gets the value
func (p *KVStoreService) Get(key string, value *string) error {
	p.mu.Lock()
	defer p.mu.Unlock()

	if v, ok := p.m[key]; ok {
		*value = v
		return nil
	}

	return fmt.Errorf("not found")}// Set Sets the value
func (p *KVStoreService) Set(kv [2]string, reply *struct{}) error {
	p.mu.Lock()
	defer p.mu.Unlock()
	key, value := kv[0], kv[1]

	// The notification has changed
	ifoldValue := p.m[key]; oldValue ! = value { fmt.Println(key)for _, fn := range p.filter {
			fn(key)
		}
	}

	p.m[key] = value
	return nil
}
Copy the code

When the value changes, the Set method calls the function list of p.filter. Below is the Watch method: It defines the filter method and then monitors whether the change occurs. The function of filter here is to put the changed key into the pipe. Take the value from the pipe in Watch and return it

// Watch implements a monitor
func (p *KVStoreService) Watch(timeoutSecond int, keyChanged *string) error {
	id := fmt.Sprintf("watch-%s-%03d", time.Now(), rand.Int())
	ch := make(chan string.10) // buffered

	p.mu.Lock()
	p.filter[id] = func(key string) { ch <- key }
	p.mu.Unlock()
    
	select {
	case <-time.After(time.Duration(timeoutSecond) * time.Second):
		return fmt.Errorf("timeout")
	case key := <-ch:
		*keyChanged = key
		return nil}}Copy the code

The main function

func main() {
	rpc.RegisterName("KVStoreService", NewKVStoreService())
	lis, err := net.Listen("tcp".": 1234")
	iferr ! = nil { log.Fatal("net error")
	}
	conn, err := lis.Accept()
	iferr ! = nil { log.Fatal("conn error")
	}
	rpc.ServeConn(conn)
}
Copy the code

The client

What the client does:

  1. Start a Goroutine to call the Watch method
  2. Set up a key-value pair
  3. Print the response
package main

import (
	"fmt"
	"log"
	"net/rpc"
	"time"
)

func doClientWork(client *rpc.Client) {
	go func() {
		var keyChanged string
		err := client.Call("KVStoreService.Watch", 30, &keyChanged)
		iferr ! = nil { log.Fatal(err) } fmt.Println("watch:"Sleep(time.second * 1) err := client.call ();"KVStoreService.Set", [2]string{"abc"."abc-value"},
		new(struct{}),
	)
	iferr ! = nil { log.Fatal(err) } time.Sleep(time.Second * 3) } funcmain() {
	client, err := rpc.Dial("tcp"."localhost:1234")
	iferr ! = nil { log.Fatal("connection error")}doClientWork(client)
}
Copy the code

Reverse the RPC

This is the third example, reverse RPC, which is the provision of RPC services from the Intranet to the Internet. In essence, a server that can be accessed by the Internet serves as a client, receives requests from the Internet, and calls services as a client. The server on the Intranet serves as a server, and the server on the Intranet actively establishes connections with the server on the Internet. This is a rough topology:

func main(a) {
    rpc.Register(new(HelloService))

    for {
        conn, _ := net.Dial("tcp"."localhost:1234")
        if conn == nil {
            time.Sleep(time.Second)
            continue
        }

        rpc.ServeConn(conn)
        conn.Close()
    }
}
Copy the code

Notice that Listen changes to Dial. The Intranet service of the reverse RPC does not proactively provide THE TCP listening service, but proactively connects to the TCP server of the peer. It then provides RPC services to the other party based on each ESTABLISHED TCP link. Here is the code for the extranet client:

func doClientWork(clientChan <-chan *rpc.Client) {
    client := <-clientChan
    defer client.Close()

    var reply string
    err = client.Call("HelloService.Hello"."hello", &reply)
    iferr ! =nil {
        log.Fatal(err)
    }

    fmt.Println(reply)
}

func main(a) {
    listener, err := net.Listen("tcp".": 1234")
    iferr ! =nil {
        log.Fatal("ListenTCP error:", err)
    }

    clientChan := make(chan *rpc.Client)

    go func(a) {
        for {
            conn, err := listener.Accept()
            iferr ! =nil {
                log.Fatal("Accept error:", err)
            }

            clientChan <- rpc.NewClient(conn)
        }
    }()

    doClientWork(clientChan)
}
Copy the code

Listen listens for TCP connections, creates a new RPC.Client and calls doClientWork().

Provide RPC services based on context information

We can provide different RPC services based on different connections, for example, based on login status:

type HelloService struct {
    conn    net.Conn
    isLogin bool
}

func (p *HelloService) Login(request string, reply *string) error {
    ifrequest ! ="user:password" {
        return fmt.Errorf("auth failed")
    }
    log.Println("login ok")
    p.isLogin = true
    return nil
}

func (p *HelloService) Hello(request string, reply *string) error {
    if! p.isLogin {return fmt.Errorf("please login")
    }
    *reply = "hello:" + request + ", from" + p.conn.RemoteAddr().String()
    return nil
}

func main(a) {
    listener, err := net.Listen("tcp".": 1234")
    iferr ! =nil {
        log.Fatal("ListenTCP error:", err)
    }

    for {
        conn, err := listener.Accept()
        iferr ! =nil {
            log.Fatal("Accept error:", err)
        }

        go func(a) {
            defer conn.Close()

            p := rpc.NewServer()
            p.Register(&HelloService{conn: conn})
            p.ServeConn(conn)
        } ()
    }
}
Copy the code