This article is participating in the Java Theme Month – Java Debug Notes EventActive link

preface

The synchronization Stub Stub does not meet the requirement because it needs to send 200M compressed files. After further investigation, it is found that the client stream can solve this problem well. Note: The flow service is essentially implemented through asynchronous Stub stubs, and the server and client only need to implement the observer interface to handle the business logic


First, what is GRPC?

In gRPC, a client application can call the methods of a server application on a different machine as directly as a local object, making it easier for you to create distributed applications and services. Like many RPC systems, gRPC is based on the idea of defining a service and specifying methods (including parameters and return types) that can be called remotely. Implement this interface on the server side and run a gRPC server to handle the client calls. Having a stub on the client side can act like a server-side method.

Two, simple GRPC interface

A GRPC interface consists of the following parts note: Synchronous blocking stubs are used here because streams are not required

1. Define the proto file

Service RouteGuide {// A simple RPC interface RPC GetFeature(Point) returns (Feature) {}} message Point {int32 latitude = 1; int32 longitude = 2; } message Feature { string name = 1; Point location = 2; }Copy the code

2. Server-side code

public void getFeature(Point request, StreamObserver<Feature> responseObserver) {
            responseObserver.onNext(checkFeature(request));
            responseObserver.onCompleted();
        }
Copy the code

3. Client code

public void getFeature(int lat, int lon) {
        Point request = Point.newBuilder().setLatitude(lat).setLongitude(lon).build();

        Feature feature = blockingStub.getFeature(request);
        // Verify the returned data
        if (RouteGuideUtil.exists(feature)) {
            info("Found feature called \"{0}\" at {1}, {2}",
                    feature.getName(),
                    RouteGuideUtil.getLatitude(feature.getLocation()),
                    RouteGuideUtil.getLongitude(feature.getLocation()));
        } else {
            info("Found no feature at {0}, {1}", RouteGuideUtil.getLatitude(feature.getLocation()), RouteGuideUtil.getLongitude(feature.getLocation())); }}Copy the code

3. Stream service interface

Used here for client stream to implement file transfer

1. The proto file

Service RouteGuide {// Client file stream example RPC sendFile(stream FileInfo) returns (Info) {}} message FileInfo {int32 index = 1; bytes arrs = 2; } message Info { string msg = 1; }Copy the code

2. Server-side code

		// Test the file stream
        @Override
        public StreamObserver<FileInfo> sendFile(StreamObserver<Info> responseObserver) {

            try {
                return new StreamObserver<FileInfo>() {
                    final long startTime = System.nanoTime();
                    OutputStream os = new FileOutputStream(new File(System.currentTimeMillis() + ".zip"));
                    @Override
                    public void onNext(FileInfo fileInfo) {
                        try {
                            logger.log(Level.INFO, "File stream received");
                            fileInfo.getArrs().writeTo(os);
                        } catch(IOException e) { e.printStackTrace(); }}@Override
                    public void onError(Throwable t) {
                        System.out.println("Error occurred");
                        logger.log(Level.WARNING, "sendFile cancelled");
                    }

                    @Override
                    public void onCompleted(a) {
                        System.out.println("Complete");
                        / / close the flow
                        try {
                            os.close();
                        } catch (IOException e) {
                            e.printStackTrace();
                        }
                        long seconds = NANOSECONDS.toSeconds(System.nanoTime() - startTime);
                        responseObserver.onNext(Info.newBuilder().setMsg("success, spend time :"+ seconds).build()); responseObserver.onCompleted(); }}; }catch (Exception e) {
                e.printStackTrace();
            }
            return null;
        }
Copy the code

3. Client code

public void sendFile(a) {
        final CountDownLatch finishLatch = new CountDownLatch(1);
        StreamObserver<Info> responseObserver = new StreamObserver<Info>() {
            @Override
            public void onNext(Info info) {
                logger.info("end :"+info.getMsg());
            }

            @Override
            public void onError(Throwable t) {
                warning("sendFile Failed: {0}", Status.fromThrowable(t));
                if(testHelper ! =null) {
                    testHelper.onRpcError(t);
                }
                finishLatch.countDown();
            }

            @Override
            public void onCompleted(a) {
                info("Finished RecordRoute"); finishLatch.countDown(); }}; StreamObserver<FileInfo> requestObserver = asyncStub.sendFile(responseObserver);try {
            InputStream is = new FileInputStream(new File("/home/test/test.zip"));
            byte[] buff = new byte[2048];
            int len;
            int index = 0;
            while((len = is.read(buff)) ! = -1) { requestObserver.onNext(FileInfo.newBuilder().setIndex(index).setArrs(ByteString.copyFrom(buff)).build()); }}catch (IOException e) {
            e.printStackTrace();
        }
        // Mark the end of requests
        requestObserver.onCompleted();

        // Receiving happens asynchronously
        try {
            if(! finishLatch.await(1, TimeUnit.MINUTES)) {
                warning("send file can not finish within 1 minutes"); }}catch (InterruptedException e) {
            e.printStackTrace();
        }
        logger.info("sendFile success");
    }
Copy the code

4. Running result

Four, flow service principle

Look directly at the source code

Servercalls.class (IO. GRPC: grpc-Stub :1.26.0) serverCalls.class (IO. GRPC: grpc-Stub :1.26.0)

public Listener<ReqT> startCall(ServerCall<ReqT, RespT> call, Metadata headers) {
            ServerCalls.ServerCallStreamObserverImpl<ReqT, RespT> responseObserver = new ServerCalls.ServerCallStreamObserverImpl(call);
            // Invoke the StreamObserver method from RouteGuideGrpc
            StreamObserver<ReqT> requestObserver = this.method.invoke(responseObserver);
            responseObserver.freeze();
            if (responseObserver.autoFlowControlEnabled) {
                call.request(1);
            }
            // Add the observer to the context
            return new ServerCalls.StreamingServerCallHandler.StreamingServerCallListener(requestObserver, responseObserver, call);
        }
Copy the code

Servercallimpl.class (IO. GRPC :grpc-core:1.26.0) serverCalLimpl.class (IO. GRPC :grpc-core:1.26.0)

public ServerStreamListenerImpl(ServerCallImpl
       
         call, Listener
        
          listener, CancellableContext context)
        
       ,> {
			/ / check
            this.call = (ServerCallImpl)Preconditions.checkNotNull(call, "call");
            this.listener = (Listener)Preconditions.checkNotNull(listener, "listener must not be null");
            this.context = (CancellableContext)Preconditions.checkNotNull(context, "context");
            // Actually add
            this.context.addListener(new CancellationListener() {
                public void cancelled(Context context) {
                    ServerStreamListenerImpl.this.call.cancelled = true;
                }
            }, MoreExecutors.directExecutor());
        }
Copy the code

Context. The class (IO. GRPC: GRPC – Context: 1.26.0)

public void addListener(Context.CancellationListener cancellationListener, Executor executor) {
        checkNotNull(cancellationListener, "cancellationListener");
        checkNotNull(executor, "executor");
        if (this.canBeCancelled()) {
            Context.ExecutableListener executableListener = new Context.ExecutableListener(executor, cancellationListener);
            synchronized(this) {
                if (this.isCancelled()) {
                    executableListener.deliver();
                } else if (this.listeners == null) {
                    this.listeners = new ArrayList();
                    this.listeners.add(executableListener);
                    if (this.cancellableAncestor ! =null) {
                        this.cancellableAncestor.addListener(this.parentListener, Context.DirectExecutor.INSTANCE); }}else {
                    this.listeners.add(executableListener); }}}}Copy the code

3. Each time the message invocation from the client is initiated in ServerImp.class (IO. GRPC: grPC-core :1.26.0), getListener().messagesAvailable(producer) is initiated

public void messagesAvailable(final MessageProducer producer) {
      PerfMark.startTask("ServerStreamListener.messagesAvailable", tag);
      final Link link = PerfMark.linkOut();

      final class MessagesAvailable extends ContextRunnable {

        MessagesAvailable() {
          super(context);
        }

        @Override
        public void runInContext(a) {
          PerfMark.startTask("ServerCallListener(app).messagesAvailable", tag);
          PerfMark.linkIn(link);
          try {
          	// Get the observer in the current listener queue and consume messages from the client
            getListener().messagesAvailable(producer);
          } catch (RuntimeException e) {
            internalClose();
            throw e;
          } catch (Error e) {
            internalClose();
            throw e;
          } finally {
            PerfMark.stopTask("ServerCallListener(app).messagesAvailable", tag); }}}try {
        callExecutor.execute(new MessagesAvailable());
      } finally {
        PerfMark.stopTask("ServerStreamListener.messagesAvailable", tag); }}Copy the code

Five, the summary

Vi. Reference materials

  1. GRPC Official Document Chinese version V1.0
  2. Official Github address

Seven, download

CSDN download link: download 👈