preface
Hello everyone, I am fried fish. This chapter will introduce the flow of gRPC, which can be divided into three types:
- Server-side Streaming RPC: Indicates the server-side streaming RPC
- Client-side Streaming RPC: Indicates the Client streaming RPC
- Bidirectional Streaming RPC: Indicates two-way streaming RPC
flow
Any technology, because there are pain points, so there is the necessity of existence. If you want to see streaming calls to gRPC, go ahead
figure
GRPC Streaming is based on HTTP/2, which will be explained in more detail in the following sections
Why not use Simple RPC
Why should streaming exist? Is there something wrong with Simple RPC? By simulating service scenarios, the following problems occur when Simple RPC is used:
- Instantaneous pressure caused by too large packets
- When receiving data packets, only after all data packets are accepted successfully and correctly, can the response be called back for service processing (the client cannot send the data packets while the server processes them).
Why Streaming RPC
- Large packet
- Real-time scene
Simulation scenario
Every day at 6 am, A batch of data sets of millions are synchronized from A to B. During synchronization, A series of operations (archiving, data analysis, profiling, logging, etc.) are performed. That’s a lot of data at one time
After the synchronization is completed, some people will immediately go to the data, for the new day preparation. Also in real time.
In contrast, Streaming RPC is more suitable for this scenario
gRPC
I’ll focus on the first section when I talk about gRPC streaming code specifically, because the three modes are really different combinations. Hope you can pay attention to understand, draw inferences by analogy, it is same knowledge point actually 👍
The directory structure
$├─ ├─ ├─ ├─ go │ ├─ $├─ go │ ├─ go │ ├─ $├─ go │ ├─ go │ ├─ ├─ ├─ class.go ├── search. Go ├─ class.go ├── search └ ─ ─ server. GoCopy the code
Stream_server, stream_client to store server and client files, proto/stream.proto to write IDL
IDL
In the stream. Proto file under the proto folder, write the following:
syntax = "proto3";
package proto;
service StreamService {
rpc List(StreamRequest) returns (stream StreamResponse) {};
rpc Record(stream StreamRequest) returns (StreamResponse) {};
rpc Route(stream StreamRequest) returns (stream StreamResponse) {};
}
message StreamPoint {
string name = 1;
int32 value = 2;
}
message StreamRequest {
StreamPoint pt = 1;
}
message StreamResponse {
StreamPoint pt = 1;
}
Copy the code
Note the keyword stream, which is declared as a stream method. There are three methods involved here, and the corresponding relationship is
- List: server-side streaming RPC
- Record: client streaming RPC
- Route: bidirectional streaming RPC
Base template + empty definition
Server
package main
import (
"log"
"net"
"google.golang.org/grpc"
pb "github.com/EDDYCJY/go-grpc-example/proto"
)
type StreamService struct{}
const (
PORT = "9002"
)
func main() {
server := grpc.NewServer()
pb.RegisterStreamServiceServer(server, &StreamService{})
lis, err := net.Listen("tcp".":"+PORT)
iferr ! = nil { log.Fatalf("net.Listen err: %v", err)
}
server.Serve(lis)
}
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
return nil
}
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
return nil
}
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
return nil
}
Copy the code
Before writing the code, you are advised to define the basic template and interface of the gRPC Server. If you are not clear, refer to the previous chapter for information
Client
package main
import (
"log"
"google.golang.org/grpc"
pb "github.com/EDDYCJY/go-grpc-example/proto"
)
const (
PORT = "9002"
)
func main() {
conn, err := grpc.Dial(":"+PORT, grpc.WithInsecure())
iferr ! = nil { log.Fatalf("grpc.Dial err: %v", err)
}
defer conn.Close()
client := pb.NewStreamServiceClient(conn)
err = printLists(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: List", Value: 2018}})
iferr ! = nil { log.Fatalf("printLists.err: %v", err)
}
err = printRecord(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Record", Value: 2018}})
iferr ! = nil { log.Fatalf("printRecord.err: %v", err)
}
err = printRoute(client, &pb.StreamRequest{Pt: &pb.StreamPoint{Name: "gRPC Stream Client: Route", Value: 2018}})
iferr ! = nil { log.Fatalf("printRoute.err: %v", err)
}
}
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
return nil
}
Copy the code
1, The Server-side Streaming RPC: the Server-side streaming RPC
Server-side streaming RPC is obviously one-way flow, and refers to the Server as a Stream and the Client as a normal RPC request
In simple terms, the client initiates a common RPC request, the server sends the data set several times through streaming response, and the client Recv receives the data set. The general figure is as follows:
Server
func (s *StreamService) List(r *pb.StreamRequest, stream pb.StreamService_ListServer) error {
for n := 0; n <= 6; n++ {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: r.Pt.Name,
Value: r.Pt.Value + int32(n),
},
})
iferr ! = nil {return err
}
}
return nil
}
Copy the code
On the Server, focus on the stream.send method. It looks like it can be sent N times, right? Is there a size limit?
type StreamService_ListServer interface {
Send(*StreamResponse) error
grpc.ServerStream
}
func (x *streamServiceListServer) Send(m *StreamResponse) error {
return x.ServerStream.SendMsg(m)
}
Copy the code
Through reading the source code, it is known that protoc generates various interface methods according to the definition when it is generated. Finally, the internal SendMsg method is unified, which involves the following process:
- Message body (object) serialization
- Compressing the serialized message body
- Adds a 5-byte header to the body of the message being transmitted
- Determines whether the total length of the compressed + serialized message body is greater than the default maxSendMessageSize (the default is
math.MaxInt32
), an error message will be displayed if the number exceeds - A data set written to a stream
Client
func printLists(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.List(context.Background(), r)
iferr ! = nil {return err
}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
}
iferr ! = nil {return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
}
return nil
}
Copy the code
On the Client side, focus on the stream.recv () method. When is io.eof? When is there an error message?
type StreamService_ListClient interface {
Recv() (*StreamResponse, error)
grpc.ClientStream
}
func (x *streamServiceListClient) Recv() (*StreamResponse, error) {
m := new(StreamResponse)
iferr := x.ClientStream.RecvMsg(m); err ! = nil {return nil, err
}
return m, nil
}
Copy the code
RecvMsg will read the entire gRPC message body from the stream.
(1) RecvMsg is blocked waiting
(2) RecvMsg Returns IO.EOF when the stream succeeds/terminates (Close is called)
(3) RecvMsg When any error occurs in the stream, the stream will be terminated, and the error message will contain RPC error code. The following error may occur in RecvMsg:
- io.EOF
- io.ErrUnexpectedEOF
- transport.ConnectionError
- google.golang.org/grpc/codes
Note that the default value of MaxReceiveMessageSize is 1024 * 1024 * 4. You are advised not to exceed the value
validation
Run stream_server/server. Go:
$ go run server.go
Copy the code
Run stream_client/client. Go:
$ go run client.go
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2018
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2019
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2020
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2021
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2022
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2023
2018/09/24 16:18:25 resp: pj.name: gRPC Stream Client: List, pt.value: 2024
Copy the code
2, Client-side Streaming RPC: Streaming RPC
Client streaming RPC, one-way flow, the client sends multiple RPC requests to the server through streaming, and the server sends a response to the client, as shown roughly:
Server
func (s *StreamService) Record(stream pb.StreamService_RecordServer) error {
for {
r, err := stream.Recv()
if err == io.EOF {
return stream.SendAndClose(&pb.StreamResponse{Pt: &pb.StreamPoint{Name: "gRPC Stream Server: Record", Value: 1}})
}
iferr ! = nil {return err
}
log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}
Copy the code
SendAndClose = stream.sendandClose = stream.sendandClose = stream.sendandClose
In this program, we process each Recv, and when we find IO.EOF (stream closed), we need to send the final response to the client, and close the Recv that is waiting on the other side
Client
func printRecord(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Record(context.Background())
iferr ! = nil {return err
}
for n := 0; n < 6; n++ {
err := stream.Send(r)
iferr ! = nil {return err
}
}
resp, err := stream.CloseAndRecv()
iferr ! = nil {return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
return nil
}
Copy the code
Stream. CloseAndRecv and stream.SendAndClose are the companion stream methods, and you are sure to know what they do
validation
Restart stream_server/server.go and run stream_client/client.go again:
Stream_client:
$ go run client.go
2018/09/24 16:23:03 resp: pj.name: gRPC Stream Server: Record, pt.value: 1
Copy the code
Stream_server:
$ go run server.go
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
2018/09/24 16:23:03 stream.Recv pt.name: gRPC Stream Client: Record, pt.value: 2018
Copy the code
3, Bidirectional Streaming RPC
Two-way streaming RPC, as the name implies, is two-way streaming. The client initiates the request in a streaming manner, and the server responds to the request in a streaming manner
The first request must be initiated by the Client, but the interaction mode (who comes first and who comes after, how much to send at a time, how much to respond to, and when to close) depends on how the program is written (can be combined with coroutines).
If the bidirectional stream is sent in sequence, it looks like this:
Again, it is important to emphasize that two-way flows vary greatly and vary from program to program. Bidirectional flow diagrams cannot be applied to different scenarios
Server
func (s *StreamService) Route(stream pb.StreamService_RouteServer) error {
n := 0
for {
err := stream.Send(&pb.StreamResponse{
Pt: &pb.StreamPoint{
Name: "gPRC Stream Client: Route",
Value: int32(n),
},
})
iferr ! = nil {return err
}
r, err := stream.Recv()
if err == io.EOF {
return nil
}
iferr ! = nil {return err
}
n++
log.Printf("stream.Recv pt.name: %s, pt.value: %d", r.Pt.Name, r.Pt.Value)
}
return nil
}
Copy the code
Client
func printRoute(client pb.StreamServiceClient, r *pb.StreamRequest) error {
stream, err := client.Route(context.Background())
iferr ! = nil {return err
}
for n := 0; n <= 6; n++ {
err = stream.Send(r)
iferr ! = nil {return err
}
resp, err := stream.Recv()
if err == io.EOF {
break
}
iferr ! = nil {return err
}
log.Printf("resp: pj.name: %s, pt.value: %d", resp.Pt.Name, resp.Pt.Value)
}
stream.CloseSend()
return nil
}
Copy the code
validation
Restart stream_server/server.go and run stream_client/client.go again:
stream_server
$ go run server.go
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
2018/09/24 16:29:43 stream.Recv pt.name: gRPC Stream Client: Route, pt.value: 2018
Copy the code
stream_client
$ go run client.go
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 0
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 1
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 2
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 3
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 4
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 5
2018/09/24 16:29:43 resp: pj.name: gPRC Stream Client: Route, pt.value: 6
Copy the code
conclusion
In this paper, three types of flow interaction modes are introduced. You can choose the appropriate mode according to the actual business scenarios. You’ll get twice the result with half the effort.
?
If you have any questions or mistakes, welcome to raise questions or give correction opinions on issues. If you like or are helpful to you, welcome Star, which is a kind of encouragement and promotion for the author.
My official account
reference
Sample code for this series
- go-grpc-example