Video information
grpc: From Tutorial to Production by Alan Shreve at GopherCon 2017
www.youtube.com/watch?v=7FZ…
Blog: about.sourcegraph.com/go/grpc-in-…
How should microservices communicate with each other?
The answer is: SOAP… Well, just kidding, of course it can’t be SOAP.
The prevailing approach is HTTP + JSON (REST API)
Alan says, “If I never write another REST client library in my life, I will die a happy death… 😂 “, because it’s the most boring thing, doing the same thing over and over again.
Why is the REST API bad?
- Implementing Stream is too difficult
- Two-way flow is impossible
- It is difficult to model operations
- Inefficient, text representation is not the best choice for networks
- Also, it’s not RESTful inside the service at all, it’s just an HTTP endpoint
- It’s hard to get multiple resource data in a single request (see GraphQL for a counterexample)
- There are no formal (machine-readable) API constraints
- So writing clients requires humans
- And because 👷 is very expensive, and do not like to write the client
- So writing clients requires humans
What is the gRPC
GPRC is a high-performance, open source, general-purpose RPC framework.
Instead of trying to define it, let’s actually make it a little bit clearer.
Build a caching service
With gRPC and things like that, we didn’t start by writing Go code, we started by writing the IDL for gRPC.
app.proto
syntax = "proto3"
package rpc;
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
}
message StoreReq {
string key = 1;
bytes val = 2;
}
message StoreResp {
}
message GetReq {
string key = 1;
}
message GetResp {
bytes val = 1;
}
Copy the code
When we wrote this file, we immediately had client libraries in nine languages.
- C++
- Java(and Android)
- Python
- Go
- Ruby
- C#
- Javascript(node.js)
- Objective-C (iOS!)
- PHP
We also have server-side API stubs for seven languages:
- C++
- Java
- Python
- Go
- Ruby
- C#
- Javascript(node.js)
server.go
func serverMain() {
iferr := runServer(); err ! = nil { fmt.Fprintf(os.Stderr, "Failed to run cache server: %s\n", err)
os.Exit(1)
}
}
func runServer() error {
srv := grpc.NewServer()
rpc.RegisterCacheServer(srv, &CacheService{})
l, err := net.Listen("tcp"."localhost:5051")
iferr ! = nil {return err
}
// block
return srv.Serve(l)
}
Copy the code
Leave CacheService empty and implement it later.
type CacheService struct {
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
return nil, fmt.Errorf("unimplemented")
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
return nil, fmt.Errorf("unimplemented")}Copy the code
client.go
func clientMain() {
iferr ! = runClient(); err ! = nil { fmt.Fprintf(os.Stderr, "failed: %v\n", err) os.exit (1)}} func runClient() error {// Establish a connection conn, err := grpc.dial ("localhost:5053", grpc.WithInsecure())
iferr ! = nil {return fmt.Errorf("failed to dial server: %v", err)} cache := rpc.newCacheclient (conn) // Call GRPC's store() method to store key-value pairs {"gopher": "con" }
_, err = cache.Store(context.Background(), &rpc.StoreReq{Key: "gopher", Val: []byte("con")})
iferr ! = nil {return fmt.Errorf("failed to store: %v"Resp, err := cache.get (context.background (), &rpc.getreq {Key:"gopher"})
iferr ! = nil {return fmt.Errorf("failed to get: %v", err)} // Print fmt.printf ("Got cached value %s\n", resp.Val)
return nil
}
Copy the code
Isn’t that WSDL?
Some people may think that this is too similar to WSDL, and they are right, because gRPC has taken the good things from previous SOAP/WSDL mistakes as well.
- Less tightly related to XML (GRPC is pluggable and can be used in various low-level expressions)
- Anyone who has written XML/XSD knows that these service definitions are too onerous, and gRPC doesn’t have this problem
- The WSDL class has completely unnecessary complexity and basically unnecessary functionality (two-step COMMIT)
- WSDL is inflexible and not forward-compatible (unlike Protobuf)
- Poor SOAP/WSDL performance and inability to use streams
- But machine-understandable API definitions in WSDL are a good thing
Implement a specific CacheService
server.go
type CacheService struct {
store map[string][]byte
}
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
val := s.store[req.Key]
return &rpc.GetResp{Val: val}, nil
}
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
s.store[req.Key] = req.Val
return &rpc.StoreResp{}, nil
}
Copy the code
Note that there are no locks, you can imagine there are, because they will be called concurrently in the future.
Error handling
GRPC supports error handling, of course. Suppose we overwrite Get() to report an error for a nonexistent key:
func (s *CacheService) Get(ctx context.Context, req *rpc.GetReq) (*rpc.GetResp, error) {
val, ok := s.store[req.Key]
if! ok {return nil, status.Errorf(code.NotFound, "Key not found %s", req.Key)
}
return &rpc.GetResp{Val: val}, nil
}
Copy the code
The encrypted
If such code were to be deployed, it would be intercepted by the SRE because all communications must be encrypted.
It is easy to add TLS encrypted transport to gRPC. For example, we modify runServer() to add TLS encrypted transport.
func runServer() error {
tlsCreds, err := credentials.NewServerTLSFromFile("tls.crt"."tls.key")
iferr ! = nil {return err
}
srv := grpc.NewServer(grpc.Creds(tlsCreds))
...
}
Copy the code
We also need to modify runClient().
func runClient() error {
tlsCreds := credentials.NewTLS(&tls.Config(InsecureSkipVerify: true))
conn, err := grpc.Dial("localhost:5051", grpc.WithTransportCredentials(tlsCreds))
...
}
Copy the code
How to use gRPC in production environment
- HTTP/2
- protobuf serialization (pluggable)
- The client will open a long connection with the GRPC server
- For each RPC call there will be a new HTTP/2 stream
- Allows RPC calls to simulate flight mode
- Allows client and server Streaming
The realization of the gRPC
There are now three high-performance, event-driven implementations
- C
- Ruby, Python, Node.js, PHP, C#, Objective-C, and C++ are all bindings to this C core implementation
- PHP is bound to this implementation via PECL
- Java
- Netty + BoringSSL via JNI
- Go
- Pure Go implementation, using the Go standard library crypto/ TLS
Where did gRPC come from
- It was originally created by a team at Google
- Even earlier was an internal Google project called Stubby
- The gRPC is the next generation of its open source project, and it’s not just Google that’s using it. Many companies are contributing code
- Of course, Google is also a major contributor to the code
Production environment case: Multi-tenant
After we went into production, we found that a number of customers generated a large number of key values. We inquired that some customers wanted to cache everything, which was obviously not good for our cache service.
We would like to limit this behavior, but the current system cannot meet this requirement, so we need to modify the implementation to issue customer tokens to each customer, so that we can restrict the maximum number of key values that a particular customer can create and avoid system abuse. This becomes a multi-tenant caching service.
As before, we’ll start with IDL, and we’ll need to modify the interface to add an account_token.
message StoreReq {
string key = 1;
bytes val = 2;
string account_token = 3;
}
Copy the code
Also, we need to have a separate service for the account to get the number of cached keys allowed by the account:
service Accounts {
rpc GetByToken(GetByTokenReq) return (GetByTokenResp) {}
}
message GetByTokenReq {
string token = 1;
}
message GetByTokenResp {
Account account = 1;
}
message Account {
int64 max_cache_keys = 1;
}
Copy the code
A new Accounts service is created with a GetByToken() method that gives the token and returns a result of type Account with the max_cache_keys corresponding to the maximum number of cacheable keys.
Now let’s further modify client.go
func runClient() error {
...
cache := rpc.NewCacheClient(conn)
_, err = cache.Store(context.Background(), &rpc.StoreReq{
AccountToken: "inconshreveable",
Key: "gopher",
Val: []byte("con"})),iferr ! = nil {return fmt.Errorf("failed to store: %v", err)
}
...
}
Copy the code
The changes on the service side are a little more dramatic, but not excessive.
type CacheService struct {
accounts rpc.AccountsClient
store map[string][]byte
keysByAccount map[string]int64
}
Copy the code
Note that accounts here is a client of GRPC, because our service is also a client of another GRPC service. So in the next Store() implementation, we first need to call another service through accounts to get account information.
Func (s *CacheService) Store(CTX context.context, req * rpc.storereq) (* rpc.Storeresp, error) { Resp, err := s.counts.GetByToken(context.background (), & rpc.getByTokenReq {Token: req.accountToken,})iferr ! = nil {returnNil, err} // Check for overuseif s.keysByAccount[req.AccountToken] >= resp.Account.MaxCacheKeys {
return nil, status.Errorf(codes.FailedPrecondition, "Account %s exceeds max key limit %d", the req. AccountToken, resp. The Account. MaxCacheKeys)} / / if the key does not exist, needs a new key values, then we have to counter plus oneif_, ok := s.store[req.Key]; ! Ok {s.keysbyAccount [req.accountToken] += 1} // Save the Key value s.tore [req.key] = req.valreturn &rpc.StoreResp{}, nil
}
Copy the code
Production environment case: Performance
The above problem is resolved, and our service is back to normal again. No user can create too many keys. But soon, we received new issues from other users, many of whom responded that the new system was slow and did not meet the SLA requirements.
But we had no idea what was going on, and we realized that our program had no Observability, in other words, no measurement system to count performance-related data.
Let’s start with the simplest, adding logs.
Let’s start with client.go and add some measurements and counts and log output.
. // Start := time.now () _, err = cache.store (context.background (), & rpc.storereq {AccountToken:"inconshreveable",
Key: "gopher",
Val: []byte("con"),}) // Calculate cache.store () call time log.printf ()"cache.Store duration %s", time.Since(start))
iferr ! = nil {return fmt.Errorf("failed to store: %v"Start = time.now (); // Call GRPC get() to fetch the value resp. err := cache.Get(context.Background(), &rpc.GetReq{Key:"gopher"}) // Calculate cache.get () call time log.printf ()"cache.Get duration %s", time.Since(start))
iferr ! = nil {return fmt.Errorf("failed to get: %v", err)
}
Copy the code
This is also done on the server side.
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, Start := time.now () // call another service to get account information, Resp, err := s.counts.GetByToken(context.background (), &rpc.getByTokenreq {Token: Req.accounttoken,}) // Output the call time of account.getByToken () log.printf ("accounts.GetByToken duration %s", time.Since(start))
...
}
Copy the code
After these modifications, we find that the same thing is done over and over again. Is there any way to change this boring practice? Looking through the GRPC documentation, I see something called the Client Interceptor.
This is pretty much a middleware, but on the client side. When a client makes an RPC call, the middleware is called first, so the middleware can wrap the call before making the call.
To implement this functionality, we create a new file called interceptor.go:
func WithClientInterceptor() grpc.DialOption {
returngrpc.WithUnaryInterceptor(clientInterceptor) } func clientInterceptor( ctx context.Context, method string, req interface{}, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ... grpc.CallOption, ) error { start := time.Now() err := invoker(ctx, method, req, reply, cc, opts...) log.Printf("invoke remote method=%s duration=%s error=%v", method, time.Since(start), err)
return err
}
Copy the code
Once we have the WithClientInterceptor(), we can register it at grpc.dial (). client.go
func runClient() error {
...
conn, err := grpc.Dial("localhost:5051",
grpc.WithTransportCredentials(tlsCreds),
WithClientInterceptor())
...
}
Copy the code
Once registered, all GRPC calls go through our registered clientInterceptor(), so all times are counted instead of adding times, metrics, and output over and over again inside each function.
When you add the metrics on the client side, the natural question is can the server do the same thing? After reviewing the documentation, yes, there is something called Server Interceptor.
Similarly, we add interceptor.go to the server and the ServerInterceptor() function.
func ServerInterceptor() grpc.ServerOption {
return grpc.UnaryInterceptor(serverInterceptor)
}
func serverInterceptor(
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
log.Printf("invoke server method=%s duration=%s error=%v",
info.FullMethod,
time.Since(start),
err)
return resp, err
}
Copy the code
As with the client, we need to register the middleware we defined at runServer().
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds), ServerInterceptor())
...
}
Copy the code
Production environment case: Timeout
After add the log, we finally found in the log, / RPC Accounts/GetByToken/took a long time. We need to set a timeout for this operation. server.go
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
accountsCtx, _ := context.WithTimeout(context.Background(), 2 * time.Second)
resp, err := s.accounts.GetByToken(accountsCtx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
...
}
Copy the code
It’s easy to use context.withtimeout () in the standard library.
Production case: Context passing
After the above modification, the customer still complained that SLA was not met, which is correct after a careful thought. Even if this constraint is 2 seconds, the client call takes time, and other code takes time in between. And some customers say, we need one second here, not two seconds.
Well, let’s push this time setting to the caller.
First we require a call time constraint on the client: client.go
func runClient() error {
...
ctx, _ := context.WithTimeout(context.Background(), time.Second)
_, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})... ctx, _ = context.WithTimeout(context.Background(), 50*time.Millisecond) resp, err := cache.Get(ctx, &rpc.GetReq{Key:"gopher"})... }Copy the code
Then on the server side, we pass the context. Take the caller’s CTX directly.
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
...
}
Copy the code
Production environment example: GRPC Metadata
With all the above problems out of the way, we can finally breathe a sigh of relief. But the client came up with new demands… 😅, saying can we add a Dry Run flag, means I want you to do everything you need to do, except actually modify the key library.
GRPC metadata, also known as GRPC Header. Just like HTTP headers, there can be some Metadata information passed through. Using metadata, we can make Dry Run implementation more concise. Instead of implementing Dry Run flag checking logic in each RPC method, we can separate it out.
func (s *CacheService) Store(ctx context.Context, req *rpc.StoreReq) (*rpc.StoreResp, error) {
resp, err := s.accounts.GetByToken(ctx, &rpc.GetByTokenReq{
Token: req.AccountToken,
})
if! dryRun(ctx) {if_, ok := s.store[req.Key]; ! ok { s.keysByAccount[req.AccountToke] += 1 } s.store[req.Key] = req.Val }return &rpc.StoreResp{}, nil
}
func dryRun(ctx context.Context) bool {
md, ok := metadata.FromContext(ctx)
if! ok {return false
}
val, ok := md["dry-run"]
if! ok {return false
}
if len(val) < 1 {
return false
}
return val[0] == "1"
}
Copy the code
There is, of course, a trade-off, since generalizing removes the ability to type check.
When invoked by the client, the dry-run parameter is added to the metadata as required.
func runClient() error {
...
ctx, _ := context.WithTimeout(context.Background(), time.Second)
ctx = metadata.NewContext(ctx, metadata.Pairs("dry-run"."1"))
_, err = cache.Store(ctx, &rpc.StoreReq{Key: "gopher", Val: []byte("con")})... }Copy the code
Production environment case: Retry
After realizing Dry Run, I thought I could have a rest, but the customer who complained about slow before complained again. Although there is timeout control and SLA is met, the service is still slow, and the total timeout is not successful. I checked and found that it was something on the Internet and there was not much we could do. To solve the customer’s problem, let’s add a retry mechanism.
We can add a Retry mechanism to each gRPC call, or we can use the Interceptor as we did in the previous timing statistics.
func clientInterceptor(...) error {
var (
start = time.Now()
attempts = 0
err error
backoff retryBackOff
)
for {
attempts += 1
select {
case <-ctx.Done():
err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
case <-backoff.Next():
startAttempt := time.Now()
err = invoker(ctx, method, req, reply, cc, opts...)
iferr ! = nil { log.Printf(...)continue}}break
}
log.Printf(...)
return err
}
Copy the code
Looks good, and I’m ready to release the code. Results are submitted for review and are called back saying that this code is not valid because non-idempotent actions will result in multiple executions and change the expected results.
It seems that we have to make a distinction between idempotent and non-idempotent operations.
silo.FireZeMissiles(NotIdempotent(ctx), req)
Copy the code
Well, of course, there is no such thing. So we’re going to have to create our own tag, through the context, to indicate whether the operation is idempotent or not.
func NotIdempotent(ctx context.Context) context.Context {
return context.WithValue(ctx, "idempotent".false)
}
func isIdempotent(ctx context.Context) bool {
val, ok := ctx.Value("idempotent").(bool)
if! ok {return true
}
return val
}
Copy the code
Then add an isIdempotent() judgment to our clientInterceptor() implementation:
func clientInterceptor(...) error {
var (
start = time.Now()
attempts = 0
err error
backoff retryBackOff
)
for {
attempts += 1
select {
case <-ctx.Done():
err = status.Errorf(codes.DeadlineExceeded, "timeout reached before next retry attempt")
case <-backoff.Next():
startAttempt := time.Now()
err = invoker(ctx, method, req, reply, cc, opts...)
iferr ! = nil && isIdempotent(ctx) { log.Printf(...)continue}}break
}
log.Printf(...)
return err
}
Copy the code
In this case, when the invocation fails, the client checks the idempotent condition and tries again. Otherwise, the client does not retry. Repetition of non-idempotent operations is avoided.
Production case: Structuring error
It felt like there was no problem, so the deployment went live. But after running for a while, something was wrong. All successful RPC calls, that is, the operation itself is correct, are fine, and the timeout retries are fine. But all failed RPC calls are wrong, and all failed RPC calls return a timeout, not the error itself. The request itself failed. For example, the Get() key did not exist and should return an error. Or Store() has exceeded its quota and should return an error, which is not seen in the log and instead corresponds to a timeout.
After analysis, it is found that the server reported the error without any problems, but the client did not return the error to the caller, the client code instead began to retry the operation. There seems to be a problem with the retried code.
err = invoker(ctx, method, req, reply, cc, opts...)
iferr ! = nil && isIdempotent(ctx) { log.Printf(...)continue
}
Copy the code
If you look closely at this part of the code, you’ll see that whatever err is, as long as it’s not nil, we try again. In fact, this is not true, we should only retry for certain errors, such as network problems and so on, not for errors we want to return to the caller, that doesn’t make sense.
So the question becomes, how should we judge err to decide whether to retry?
- Different Error codes can be used. For certain codes, Retry is required. For others, you need to customize gRPC Error codes.
- We can also define data of type Error that contains some flag bit to tell whether it is worth retry
- Or simply place the error code in the Response message, ensuring that each message has an error code we define to indicate whether it needs to be retry.
Therefore, we need a complete structured Error message, rather than a simple Error Code and string. Of course it’s a difficult road, but we’ve done so much that we can overcome it with a little persistence.
Here we start with IDL again:
message Error {
int64 code = 1;
string messsage = 2;
bool temporary = 3;
int64 userErrorCode = 4;
}
Copy the code
Then we implement this Error type. rpc/error.go
func (e *Error) Error() string {
returne.Message } func Errorf(code codes.Code, temporary bool, msg string, args .. interface{}) error {return &Error{
Code: int64(code),
Message: fmt.Sprintf(msg, args...) , Temporary: temporary, } }Copy the code
With these two functions, we can display and construct variables of type Error, but how do we get the Error message back to the client? Then the problem starts to get tedious: RPC /error.go
func MarshalError (err error, ctx context.Context) error {
rerr, ok := err.(*Error)
if! ok {return err
}
pberr, marshalerr := pb.Marshal(rerr)
if marshalerr == nil {
md := metadata.Pairs("rpc-error", base64.StdEncoding.EncodeToString(pberr))
_ = grpc.SetTrailer(ctx, md)
}
return status.Errorf(codes.Code(rerr.Code), rerr.Message)
}
func UnmarshalError(err error, md metadata.MD) *Error {
vals, ok := md["rpc-error"]
if! ok {return nil
}
buf, err := base64.StdEncoding.DecodeString(vals[0])
iferr ! = nil {return nil
}
var rerr Error
iferr := pb.Unmarshal(buf, &rerr); err ! = nil {return nil
}
return &rerr
}
Copy the code
interceptor.go
func serverInterceptor (
ctx context.Context,
req interface{},
info *grpc.UnaryServerInfo,
handler grpc.UnaryHandler,
) (interface{}, error) {
start := time.Now()
resp, err := handler(ctx, req)
err = rpc.MarshalError(err, ctx)
log.Print(...)
return resp, err
}
Copy the code
It’s ugly, but works.
This is how to work around the problem and make do with it when gRPC does not support advanced Error. By doing this now, errors can be passed across host boundaries.
Production environment example: Dump
There are customers come to demand, some customers say we can save, can also take, but how to get all the data inside down? The need arises to implement Dump(), which can fetch all the data.
Dump() = Dump() = Dump();
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
rpc Dump(DumpReq) returns (DumpResp) {}
}
message DumpReq{
}
message DumpResp {
repeated DumpItem items = 1;
}
message DumpItem {
string key = 1;
bytes val = 2;
}
Copy the code
“DumpResp” is repeated, because protobuf doesn’t call it “array” for some reason.
Production environment case: flow control
A new feature called Dump was launched, and it turned out that everyone liked to Dump, so there were so many people doing it that the server ran out of memory. So we need some kind of restriction that can control the flow.
After reviewing the documentation, it turns out that we can control how much concurrency can be accessed at the same time and how often the service can be accessed. server.go
func runServer() error {
...
srv := grpc.NewServer(grpc.Creds(tlsCreds),
ServerInterceptor(),
grpc.MaxConcurrentStreams(64),
grpc.InTapHandle(NewTap().Handler))
rpc.RegisterCacheServer(srv, NewCacheService(accounts))
l, err := net.Listen("tcp"."localhost:5051")
iferr ! = nil {return err
}
l = netutil.LimitListener(l, 1024)
return srv.Serve(l)
}
Copy the code
Here use netutil. LimitListener (l, 1024) can control the total how many connections, and then use GRPC. MaxConcurrentStreams (64) specifies the each GRPC connections can have how many concurrent flow (stream). These two together basically control the total number of concurrent requests.
But there’s no place in the gRPC to limit how often you can access it. So grpc.intapHandle (NewTap().handler)) is used for the customization, which is performed further up the line.
tap.go
type Tap struct {
lim *rate.Limiter
}
func NewTap() *Tap {
return &Tap(rate.NewLimiter(150, 5))
}
func (t *Tap) Handler(ctx context.Context, info *tap.Info) (context.Context, error) {
if! t.lim.Allow() {
return nil, status.Errorf(codes.ResourceExhausted, "service is over rate limit")}return ctx, nil
}
Copy the code
Production environment case: Streaming
After the previous solution was deployed, the memory was finally reduced, but before we took a break, we found that people were more and more interested in using this cache service, and the memory was running out. At this point we began to wonder if we could tweak the design so that instead of immediately generating a full array of returns in memory every Dump, we could stream them back as needed. app.proto
syntax = "proto3";
package rpc;
service Cache {
rpc Store(StoreReq) returns (StoreResp) {}
rpc Get(GetReq) returns (GetResp) {}
rpc Dump(DumpReq) returns (stream DumpItem) {}
}
message DumpReq{
}
message DumpItem {
string key = 1;
bytes val = 2;
}
Copy the code
Instead of repeated as an array, a stream is used. The client requests a Dump() and sends the result back as a stream. server.go
func (s *CacheService) Dump(req *rpc.DumpReq, stream rpc.Cache_DumpServer) error {
for k, v := range s.store {
stream.Send(&rpc.DumpItem{
Key: k,
Val: v,
})
}
return nil
}
Copy the code
We modify the implementation of Dump() to Send each record to the stream using stream.send ().
Notice we don’t have a context here, just a stream. client.go
func runClient() error {
...
stream, err := cache.Dump(context.Background(), &rpc.DumpReq{})
iferr ! = nil {return fmt.Errorf("failed to dump: %v", err)
}
for {
item, err := stream.Recv()
if err == io.EOF {
break
}
iferr ! = nil {return fmt.Errorf("failed to stream item: %v", err)
}
}
return nil
}
Copy the code
Production environment case: horizontal scaling, load balancing
With streaming, server performance improved a lot, but our service became so attractive that we ran out of memory as more and more users. At this point we looked at the code and felt that we had done everything we could, and maybe it was time to scale from a single server to multiple servers and use load balancing between them.
GRPC is a long-connection communication. Therefore, if a client is connected to a gRPC Endpoint, it will always be connected to a fixed server. Therefore, load balancing of multiple servers does not make sense for the same client. There is no need to split requests between different servers because the client has a large number of requests.
If we want clients to be able to take advantage of the multi-server mechanism, we need smarter clients that are aware of the existence of multiple copies of the server, so that clients establish multiple connections to different servers so that a single client can take advantage of load-balancing scale-out capabilities.
Production environment case: multilingual collaboration
In a complex environment, the client (or even server) of our gRPC may be different language platforms. This is actually the advantage of gRPC, can be relatively easy to achieve cross-language platform communication.
For example, we could make a Python client:
import grpc
import rpc_pb2 as rpc
channel = grpc.insecure_channel('localhost:5051')
cache_svc = rpc.CacheStub(channel)
resp = cache_svc.Get(rpc.GetReq(
key="gopher")),print resp.val
Copy the code
One of the grimaces is that while gRPC’s cross-language communication is convenient, the implementation of each language is arbitrary, such as CacheClient() in Go and CacheStub() in Python. There is no particular reason for the name to be different, just because different authors implemented it according to their own ideas.
Where gRPC is not perfect
- Load balancing
- Structured error message
- There is no support for browser JS (which in some ways is the most commonly used client)
- API changes happen frequently (even after 1.0)
- Some language implementations are poorly documented
- There is no standardization across languages
Use cases for gRPC in a production environment
- Ngrok, all the internal 20 + communications go through gRPC
- Square, which replaced all internal communications with gRPC, was an early user and contributor of gRPC
- CoreOS, ETCD V3 completely follow gRPC
- Google, Google Cloud Service (PubSub, Speech Rec) goes by gRPC
- Netflix, Yik Yak, VSCO, Cockroach,…
GRPC future changes
- To see what’s coming, check out:
- grpc/proposal
- Grpc-io mailing list
- New language support (Swift and Haskell in trial)
- Stability, reliability, performance improvement
- Add more refined apis to support custom behavior (connection management, channel tracking)
- Browser JS
This article reprinted from: blog.lab99.org/post/golang…
Personal wechat official Account:
Individual making:
github.com/jiankunking
Personal Blog:
jiankunking.com