Welcome to my GitHub

Github.com/zq2599/blog…

Content: all original article classification summary and supporting source code, involving Java, Docker, Kubernetes, DevOPS, etc.;

This paper gives an overview of

  • This article is the fifth part of the gRPC Combat series of Java edition. The goal is to master the two-way flow type service, that is, the request parameters are in the form of flow, and the response content is also in the form of flow.
  • Let’s start with the official literature on two-way streaming RPC: Both parties use a read-write stream to send a sequence of messages. The two streams operate independently, so the client and server can read and write in any order they like: for example, the server can wait to receive all client messages before writing a response, or it can read and write messages alternately, or some other combination of reads and writes. The order of messages in each flow is reserved;
  • After mastering the development of two types of client flow and server flow, it is easy to understand the bidirectional flow type, which is the combination of the previous two types. Requests and responses are handled as flows.
  • Today’s actual combat, let’s design the function of an online mall: batch deduction inventory, that is, the client to submit a number of goods and quantity, the server to return each commodity deduction inventory success and failure;
  • Let’s enter the coding process as soon as possible, the specific content is as follows:
  1. Define the gRPC interface of bidirectional flow type in proTO file, and generate Java code through PROTO
  2. Develop server applications
  3. Develop client applications
  4. validation

Download the source code

  • The full source code for this article can be downloaded at GitHub with the following address and link information (github.com/zq2599/blog…
The name of the link note
Project home page Github.com/zq2599/blog… The project’s home page on GitHub
Git repository address (HTTPS) Github.com/zq2599/blog… The project source warehouse address, HTTPS protocol
Git repository address (SSH) [email protected]:zq2599/blog_demos.git The project source warehouse address, SSH protocol
  • The git project has several folders. The source code for the gRPC Java Series is in the grPC-tutorials folder, as shown in the red box below:

  • There are multiple directories under the grpc-tutorials folder. The server code for this article is in double-stream-server-side and the client code is in double-stream-client-side, as shown below:

Define gRPC interface of bidirectional flow type in proto file

  • The first thing to do is define the gRPC interface, open mall. Proto, add methods and related data structures to it, Note that the BatchDeduct method’s entry parameter ProductOrder and return value DeductReply have both added the stream qualifiers (ProductOrder is defined in the previous chapter) to indicate that this method is of a directional stream type:
// gRPC service, this is an online store inventory service
service StockService {
    // Two-way flow: batch discount inventory
    rpc BatchDeduct (stream ProductOrder) returns (stream DeductReply) {}
}

// Subtract inventory returns the data structure
message DeductReply {
    / / return code
    int32 code = 1;
    // Description
    string message = 2;
}
Copy the code
  • Double-click task in the red box below to generate Java code:

  • Generate the file in the red box below, the server definition and return value data structure:

  • Next, develop the server side;

Develop server applications

  • Torials are creating a module called double-stream-server-side in the parent project grpc-turtorials, whose build.gradle is described as follows:
// Use the Springboot plugin
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    // As a gRPC service provider, you need this library
    implementation 'net.devh:grpc-server-spring-boot-starter'
    // a project that relies on automatically generated source code
    implementation project(':grpc-lib')
    // the annotationProcessor will not be passed, and modules that use lombok-generated code need to declare the annotationProcessor themselves
    annotationProcessor 'org.projectlombok:lombok'
}
Copy the code
  • Configuration file application.yml:
spring:
  application:
    name: double-stream-server-side
# gRPC configuration, here only need to configure the service port number
grpc:
  server:
    port: 9901
Copy the code
  • Start the class DoubleStreamServerSideApplication. Java code is not posted, regular springboot start classes;
  • Focus is to provide GrpcServerService GRPC services. Java, we have to do is to return to an anonymous class upper frame, as for the inside of the onNext, onCompleted method when invoked is determined the upper frame, also prepared totalCount member variables, Since the request parameter is a stream, the anonymous class’s onNext is called multiple times, and since the return value is a stream, the responseObserver.onNext method is called in onNext to respond to each request in the stream. The client receives the response data from the server (i.e. the onNext method on the client is called multiple times) :
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.server.service.GrpcService;

@GrpcService
@Slf4j
public class GrpcServerService extends StockServiceGrpc.StockServiceImplBase {

    @Override
    public StreamObserver<ProductOrder> batchDeduct(StreamObserver<DeductReply> responseObserver) {
        // Return an anonymous class for use by the upper-layer framework
        return new StreamObserver<ProductOrder>() {

            private int totalCount = 0;

            @Override
            public void onNext(ProductOrder value) {
                log.info("Commodity [{}] in process, quantity [{}]",
                        value.getProductId(),
                        value.getNumber());

                // Add total
                totalCount += value.getNumber();

                int code;
                String message;

                // There is a shortage of inventory
                if (0 == value.getNumber() % 2) {
                    code = 10000;
                    message = String.format("Merchandise [%d] reduction inventory [%d] success", value.getProductId(), value.getNumber());
                } else {
                    code = 10001;
                    message = String.format("Commodity [%d] deduction inventory [%d] failed", value.getProductId(), value.getNumber());
                }

                responseObserver.onNext(DeductReply.newBuilder()
                        .setCode(code)
                        .setMessage(message)
                        .build());
            }

            @Override
            public void onError(Throwable t) {
                log.error("Abnormal batch deduction inventory", t);
            }

            @Override
            public void onCompleted(a) {
                log.info("Batch reduction inventory completed, a total of [{}] pieces of goods", totalCount); responseObserver.onCompleted(); }}; }}Copy the code

Develop client applications

  • Torials are creating a module called double-stream-server-side in the parent project grpc-turtorials, whose build.gradle is described as follows:
plugins {
    id 'org.springframework.boot'
}

dependencies {
    implementation 'org.projectlombok:lombok'
    implementation 'org.springframework.boot:spring-boot-starter'
    implementation 'org.springframework.boot:spring-boot-starter-web'
    implementation 'net.devh:grpc-client-spring-boot-starter'
    implementation project(':grpc-lib')}Copy the code
  • Config file application.yml, set your own Web port number and server address:
server:
  port: 8082
spring:
  application:
    name: double-stream-client-side

grpc:
  client:
    The name of the gRPC configuration used by the GrpcClient annotation
    double-stream-server-side:
      # gRPC server address
      address: 'static: / / 127.0.0.1:9901'
      enableKeepAlive: true
      keepAliveWithoutCalls: true
      negotiationType: plaintext
Copy the code
  • Start the class DoubleStreamClientSideApplication. Java code is not posted, regular springboot start classes;

  • We normally use StreamObserver to handle server-side responses, but since this is an asynchronous response, we need an additional method to retrieve business data from StreamObserver, so we have a new interface that inherits from StreamObserver, The new getExtra method can return a String object, as we’ll see in more detail later:

package com.bolingcavalry.grpctutorials;

import io.grpc.stub.StreamObserver;

public interface ExtendResponseObserver<T> extends StreamObserver<T> {
    String getExtra(a);
}
Copy the code
  • To see how to remotely call the gRPC interface of bidirectional flow type, detailed comments have been added to the code:
package grpctutorials;

import com.bolingcavalry.grpctutorials.lib.DeductReply;
import com.bolingcavalry.grpctutorials.lib.ProductOrder;
import com.bolingcavalry.grpctutorials.lib.StockServiceGrpc;
import io.grpc.stub.StreamObserver;
import lombok.extern.slf4j.Slf4j;
import net.devh.boot.grpc.client.inject.GrpcClient;
import org.springframework.stereotype.Service;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@Service
@Slf4j
public class GrpcClientService {

    @GrpcClient("double-stream-server-side")
    private StockServiceGrpc.StockServiceStub stockServiceStub;

    /** ** ** *@param count
     * @return* /
    public String batchDeduct(int count) {

        CountDownLatch countDownLatch = new CountDownLatch(1);

        // onNext and onCompleted of responseObserver are executed in another thread,
        // ExtendResponseObserver extends from StreamObserver
        ExtendResponseObserver<DeductReply> responseObserver = new ExtendResponseObserver<DeductReply>() {

            // Use stringBuilder to save all responses from the server
            private StringBuilder stringBuilder = new StringBuilder();

            @Override
            public String getExtra(a) {
                return stringBuilder.toString();
            }

            /** * The onNext method is executed once, taking the response as input@param value
             */
            @Override
            public void onNext(DeductReply value) {
                log.info("batch deduct on next");
                // Put it into a member variable of an anonymous class
                stringBuilder.append(String.format("Return code [%d], return message :%s

"
, value.getCode(), value.getMessage())); } @Override public void onError(Throwable t) { log.error("batch deduct gRPC request error", t); stringBuilder.append("batch deduct gRPC error, " + t.getMessage()); countDownLatch.countDown(); } /** * The onCompleted method here is called */ after the server confirms that the response is complete @Override public void onCompleted(a) { log.info("batch deduct on complete"); // After countDown, the thread that countdownlatch.await will no longer wait, // The execution will continuecountDownLatch.countDown(); }};// Remote call, when the data has not been sent to the server StreamObserver<ProductOrder> requestObserver = stockServiceStub.batchDeduct(responseObserver); for(int i=0; i<count; i++) { // Each time onNext is executed, a chunk of data is sent to the server. // The onNext method on the server is executed once requestObserver.onNext(build(101 + i, 1 + i)); } // The client tells the server that the data has been sent requestObserver.onCompleted(); try { // The onCompleted method (responseObserver) is executed in another thread. // The countDown method of countDownLatch is executed there, and once countDown is executed, the await below is completed, // await timeout is set to 2 seconds countDownLatch.await(2, TimeUnit.SECONDS); } catch (InterruptedException e) { log.error("countDownLatch await error", e); } log.info("service finish"); // The content returned by the server is placed in the requestObserver, available from the getExtra method return responseObserver.getExtra(); } /** * Create the ProductOrder object *@param productId * @param num * @return* / private static ProductOrder build(int productId, int num) { returnProductOrder.newBuilder().setProductId(productId).setNumber(num).build(); }}Copy the code
  • Finally, a Web interface that validates a remote call through a Web request:
package grpctutorials;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GrpcClientController {

    @Autowired
    private GrpcClientService grpcClientService;

    @RequestMapping("/")
    public String printMessage(@RequestParam(defaultValue = "1") int count) {
        returngrpcClientService.batchDeduct(count); }}Copy the code
  • Encoding completed, start verification;

validation

  • Start the service side DoubleStreamServerSideApplication:

  • Launch the client DoubleStreamClientSideApplication:

  • Here to change: browser type http://localhost:8083/? Count =10, the response is as follows, the remote call to gRPC service is successful, each return of the streaming response is received by the client:

  • The following is the server log, which processes each piece of data on the client side one by one:

  • Below is the client logs, visible as a result of the action of CountDownLatch, initiate gRPC responseObserver request thread had been waiting for. OnCompleted after another thread was performed, will continue to perform:

  • At this point, the development of four types of gRPC services and their clients is completed, we can cope with the general business scenarios, the next article we will continue to in-depth study, understand the complex scene of gRPC operation;

You are not alone, Xinchen original accompany all the way

  1. Java series
  2. Spring series
  3. The Docker series
  4. Kubernetes series
  5. Database + middleware series
  6. The conversation series

Welcome to pay attention to the public number: programmer Xin Chen

Wechat search “programmer Xin Chen”, I am Xin Chen, looking forward to enjoying the Java world with you…

Github.com/zq2599/blog…