Create client with remote call

FramedClientConnector connector = new FramedClientConnector(new InetSocketAddress(8081));
ThriftClientManager manager = new ThriftClientManager(new ThriftCodecManager(), new NiftyClient(), ImmutableSet.of());

EchoService.Iface client = manager.createClient(connector, EchoService.Iface.class).get();
String answer = client.echo("abc".new OperationActivityRequest().setKeywords("htae"));
System.out.println(answer);
Copy the code

Create a client to get the proxy class EchoService.iface, which can then be used as a local method to make remote calls; Seeing the core component in the code is

  1. FramedClientConnector: Relating to connections, including protocols, channels, etc.
  2. NiftyClient: Related to the client
  3. ThriftClientManager: Relates to client management

Quick understanding of important components

This section will briefly look at the functions of a few components, and the details will be discussed when analyzing the creation of the client

FramedClientConnector

The client connector is a SocketAddress and TDuplexProtocolFactory. Three important approaches are provided

  • NewThriftClientChannel: Obtains the FramedClientChannel, through which the request request can be written out.
  • NewChannelPipelineFactory: obtain the netty pipeline, used for subsequent creation netty client;
  • Connect: Connects the Netty client to the server

The top-level interface is as follows:

public interface NiftyClientConnector<T extends RequestChannel> {
    ChannelFuture connect(ClientBootstrap bootstrap);

    T newThriftClientChannel(Channel channel, NettyClientConfig clientConfig);

    ChannelPipelineFactory newChannelPipelineFactory(int maxFrameSize, NettyClientConfig clientConfig);
}
Copy the code

As you can see, the last two methods are related to Netty. The first method encapsulates the Netty channel and writes the request. In addition, FramedClientChannel inherits from AbstractClientChannel, which is very important as a Netty processor, more on this later.

NiftyClient

The name of the class is Nifty client related. Its internal properties are mainly netty related parameters, boss thread pool, worker thread pool, channel thread group ChannelGroup, Netty configuration class NettyClientConfig, And NioClientSocketChannelFactory, first to see constructor

public NiftyClient(a){
    this(NettyClientConfig.newBuilder().build());
}

public NiftyClient(NettyClientConfig nettyClientConfig)
{
    this.nettyClientConfig = nettyClientConfig;

    this.timer = nettyClientConfig.getTimer();
    this.bossExecutor = nettyClientConfig.getBossExecutor();
    this.workerExecutor = nettyClientConfig.getWorkerExecutor();
    this.defaultSocksProxyAddress = nettyClientConfig.getDefaultSocksProxyAddress();

    int bossThreadCount = nettyClientConfig.getBossThreadCount();
    int workerThreadCount = nettyClientConfig.getWorkerThreadCount();

    NioWorkerPool workerPool = new NioWorkerPool(workerExecutor, workerThreadCount, ThreadNameDeterminer.CURRENT);
    NioClientBossPool bossPool = new NioClientBossPool(bossExecutor, bossThreadCount, timer, ThreadNameDeterminer.CURRENT);

    this.channelFactory = new NioClientSocketChannelFactory(bossPool, workerPool);
}
Copy the code

From the argument constructor, we get the properties assigned to the object by the Netty related parameters from the Netty configuration class. Then create the netty client key components.

If the method is an empty parameter, the default netty configuration parameter is used to take a look at what the build method does.

public NettyClientConfig build(a) {
    Timer timer = getTimer();
    ExecutorService bossExecutor = getBossExecutor();
    int bossThreadCount = getBossThreadCount();
    ExecutorService workerExecutor = getWorkerExecutor();
    int workerThreadCount = getWorkerThreadCount();

    return newNettyClientConfig( getBootstrapOptions(), defaultSocksProxyAddress, timer ! =null ? timer : new NiftyTimer(threadNamePattern("")), bossExecutor ! =null? bossExecutor : buildDefaultBossExecutor(), bossThreadCount, workerExecutor ! =null ? workerExecutor : buildDefaultWorkerExecutor(),
            workerThreadCount
    );
}
Copy the code

The boss pool thread count is 1, the worker pool thread count is 2, and all other parameters are null. So the Timer and two thread pools are rebuilt when NettyClientConfig is built. The thread pool method newCachedThreadPool uses the custom Thread pool factory (Guava provides ThreadFactoryBuilder) to return a suitable name for the thread pool using the Executors tool class.

private ExecutorService buildDefaultBossExecutor(a){
    return newCachedThreadPool(renamingDaemonThreadFactory(threadNamePattern("-boss-%s")));
}

private ExecutorService buildDefaultWorkerExecutor(a) {
    return newCachedThreadPool(renamingDaemonThreadFactory(threadNamePattern("-worker-%s")));
}

private ThreadFactory renamingDaemonThreadFactory(String nameFormat) {
    return new ThreadFactoryBuilder().setNameFormat(nameFormat).setDaemon(true).build();
}
Copy the code

NiftyClient mainly provides one function — to get connections, including asynchronous and synchronous connections. The FramedClientChannel mentioned above is actually created by FramedClientConnector, but put into the Future.

ThriftClientManager

The name looks like the client manager, the main internal holding codec manager and NiftyClient two properties.

private final ThriftCodecManager codecManager;
private final NiftyClient niftyClient;
Copy the code

This is set at construction time, and most of the time you just need to write New ThriftClientManager(), which is created by default. This class provides the createClient method to create a client (proxy object) that can be used to make remote method calls.

Client creation and data sending process analysis

We’ll start with manager.createclient (connector, echoService.iface. Class). After setting some default timeout parameters, we’ll simplify a bit

public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
            final NiftyClientConnector<C> connector,
            final Class<T> type,
            @Nullable final Duration connectTimeout,
            @Nullable final Duration receiveTimeout,
            @Nullable final Duration readTimeout,
            @Nullable final Duration writeTimeout,
            final int maxFrameSize,
            @Nullable final String clientName,
            final List<? extends ThriftClientEventHandler> eventHandlers,
            @Nullable InetSocketAddress socksProxy)
    {
        
        Future
      
        final ListenableFuture<C> connectFuture = niftyClient.connectAsync(
                connector,
                connectTimeout,
                receiveTimeout,
                readTimeout,
                writeTimeout,
                maxFrameSize,
                socksProxy);

        // (2) Obtain the client proxy object after conversion
        ListenableFuture<T> clientFuture = Futures.transform(connectFuture, new Function<C, T>() {
            @Override
            public T apply(@NotNull C channel) {
                String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName;
                return createClient(channel, type, name, eventHandlers);
            }
        }, Runnable::run);

        return clientFuture;
    }

Copy the code

There are two main steps, obtaining the FramerClientChannel followed by obtaining the client proxy object.

Let’s start with part 1: How to get a Future

ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOptions(nettyClientConfig.getBootstrapOptions());

bootstrap.setPipelineFactory(clientChannelConnector.newChannelPipelineFactory(maxFrameSize, nettyClientConfig));
ChannelFuture nettyChannelFuture = clientChannelConnector.connect(bootstrap);
Copy the code

The first part of the netty component setup, let’s look at clientChannelConnector(FrameClientConnector) provides two methods NiftyClient:

public ChannelFuture connect(ClientBootstrap bootstrap){
    return bootstrap.connect(address);
}

@Override
public ChannelPipelineFactory newChannelPipelineFactory(final int maxFrameSize, final NettyClientConfig clientConfig)
{
    return new ChannelPipelineFactory() {
        @Override
        public ChannelPipeline getPipeline(a)
                throws Exception {
            ChannelPipeline cp = Channels.pipeline();
            TimeoutHandler.addToPipeline(cp);
            cp.addLast("frameEncoder".new LengthFieldPrepender(LENGTH_FIELD_LENGTH));
            cp.addLast(
                    "frameDecoder".new LengthFieldBasedFrameDecoder(
                            maxFrameSize,
                            LENGTH_FIELD_OFFSET,
                            LENGTH_FIELD_LENGTH,
                            LENGTH_ADJUSTMENT,
                            INITIAL_BYTES_TO_STRIP));
            cp.addLast("clientMessage".new ClientMessageHandler());
            if(clientHeader ! =null) {
                clientHeader.createHandler(cp);
            }
            returncp; }}; }Copy the code

The connect method is provided by Netty. NewChannelPipelineFactory is build netty pipelie factory, will add the various internal processor, temporarily don’t go too much explanation.

Keep reading

nettyChannelFuture.addListener(new ChannelFutureListener() {
    @Override
    public void operationComplete(ChannelFuture future) throws Exception {
        Channel channel = future.getChannel();
        if(channel ! =null&& channel.isOpen()) { allChannels.add(channel); }}});return new TNiftyFuture<>(clientChannelConnector,
                            receiveTimeout,
                            readTimeout,
                            sendTimeout,
                            nettyChannelFuture);
Copy the code

Add a listener to the channelGroup. And then create TNiftyFuture, this is the key step, so click on it

private class TNiftyFuture<T extends NiftyClientChannel> extends AbstractFuture<T> {
    private TNiftyFuture(final NiftyClientConnector<T> clientChannelConnector,
                            @Nullable final Duration receiveTimeout,
                            @Nullable final Duration readTimeout,
                            @Nullable final Duration sendTimeout,
                            final ChannelFuture channelFuture)
    {
        channelFuture.addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
                if(future.isSuccess()) { Channel nettyChannel = future.getChannel(); T channel = clientChannelConnector.newThriftClientChannel(nettyChannel, nettyClientConfig); channel.setReceiveTimeout(receiveTimeout); channel.setReadTimeout(readTimeout); channel.setSendTimeout(sendTimeout); set(channel); }}}); }}Copy the code

After the connection is established, a listener is added to the constructor. After the connection is established, a channel is acquired, through which a ThriftChannel, in this case FramedClientChannel, is created, and set to the future.

Take a look at the final method offered by clientChannelConnector, newThriftClientChannel

public FramedClientChannel newThriftClientChannel(Channel nettyChannel, NettyClientConfig clientConfig){
    FramedClientChannel channel = new FramedClientChannel(nettyChannel, clientConfig.getTimer(), getProtocolFactory());
    ChannelPipeline cp = nettyChannel.getPipeline();
    cp.addLast("thriftHandler", channel);
    return channel;
}
Copy the code

Build the nettyChannel, build the FramedClientChannel, and add it to the pipeline, so FramedClientChannel must also be a processor.

FramedClientChannel is important because some of the channel’s operations, such as receiving requests for processing, sending requests, etc., are derived from AbstractClientChannel (processor).

Now I have got thriftChannel, that is, step (1) has been introduced. Look at (2) creating a client proxy object.

// (2) Obtain the client proxy object after conversion
ListenableFuture<T> clientFuture = Futures.transform(connectFuture, new Function<C, T>() {
    @Override
    public T apply(@NotNull C channel) {
        String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName;
        return createClient(channel, type, name, eventHandlers);
    }
}, Runnable::run);
Copy the code

The Transform method is provided by Guava to perform future conversions, converting a Future to a Future, in this case converting a FrameClientChannel to EchoService.iface (which is actually a proxy object). The createClient method:

ThriftClientManager:

private final LoadingCache<TypeAndName, ThriftClientMetadata> clientMetadataCache = CacheBuilder.newBuilder()
            .build(new CacheLoader<TypeAndName, ThriftClientMetadata>()
            {
                @Override
                public ThriftClientMetadata load(TypeAndName typeAndName)
                        throws Exception
                {
                    return newThriftClientMetadata(typeAndName.getType(), typeAndName.getName(), codecManager); }});public <T> T createClient(RequestChannel channel, Class<T> type, String name, List<? extends ThriftClientEventHandler> eventHandlers) {   
        ThriftClientMetadata clientMetadata = clientMetadataCache.getUnchecked(new TypeAndName(type, name));

        String clientDescription = clientMetadata.getName() + "" + channel.toString();

        ThriftInvocationHandler handler = new ThriftInvocationHandler(clientDescription, channel,
                clientMetadata.getMethodHandlers(),
                ImmutableList.<ThriftClientEventHandler>builder().addAll(globalEventHandlers).addAll(eventHandlers).build());

        return type.cast(Proxy.newProxyInstance(
                type.getClassLoader(),
                newClass<? >[]{ type, Closeable.class }, handler )); }Copy the code

ClientMetadataCache is a local cache created using guava Cache. The key is TypeAndName and value is ThriftClientMetadata. ThriftClientMetadata is similar to ThriftServiceMetadata. It contains ThriftServiceMetadata, Type, and name. Private final Map

methodHandlers; These are all initialized when the object is created.
,>

The rest is to create the proxy object using the dynamic proxy, so you can guess that ThriftInvocationHandler inherits from InvocationHandler.

The method call

private static class ThriftInvocationHandler implements InvocationHandler
{
    private static final Object[] NO_ARGS = new Object[0];
    private final String clientDescription;

    private final RequestChannel channel;

    private final Map<Method, ThriftMethodHandler> methods;
    private static final AtomicInteger sequenceIdCursor = new AtomicInteger(1);
    private final List<? extends ThriftClientEventHandler> eventHandlers;

    private ThriftInvocationHandler( String clientDescription, RequestChannel channel, Map
       
         methods, List
         eventHandlers)
       ,>
    {
        this.clientDescription = clientDescription;
        this.channel = channel;
        this.methods = methods;
        this.eventHandlers = eventHandlers;

    }

    public RequestChannel getChannel(a){
        return channel;
    }

    @Override
    public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

        int sequenceId = sequenceIdCursor.getAndIncrement();
        TChannelBufferInputTransport inputTransport = new TChannelBufferInputTransport();
        TChannelBufferOutputTransport outputTransport = new TChannelBufferOutputTransport();

        TTransportPair transportPair = fromSeparateTransports(inputTransport, outputTransport);
        TProtocolPair protocolPair = channel.getProtocolFactory().getProtocolPair(transportPair);
        TProtocol inputProtocol = protocolPair.getInputProtocol();
        TProtocol outputProtocol = protocolPair.getOutputProtocol();

        ThriftMethodHandler methodHandler = methods.get(method);


        NiftyClientChannel niftyClientChannel = (NiftyClientChannel)channel;
        SocketAddress remoteAddress = niftyClientChannel.getNettyChannel().getRemoteAddress();

        ClientRequestContext requestContext = new NiftyClientRequestContext(inputProtocol, outputProtocol, channel, remoteAddress);
        ClientContextChain context = new ClientContextChain(eventHandlers, methodHandler.getQualifiedName(), requestContext);

        returnmethodHandler.invoke(channel,inputTransport inputTransport, outputTransport, inputProtocol, outputProtocol, sequenceId, context, args); }}Copy the code

When a method call is made, it goes to the Invoke method, as anyone who knows Java reflection knows. The Invoke method is basically three steps

  • Create the input and output protocol, here is the TBinaryProtocal, held by the internal transport TChannelBufferInputTransport and TChannelBufferOutputTransport, can be understood as the transport layer components, Internal hold ChannelBuffer to store data;
  • Get ThriftMethodHandler according to Method;
  • Get the address, call ThriftMethodHandler. Invoke method;

Methodhandler. invoke keeps going and discovers that there are synchronous and asynchronous calls, and we’ll be going synchronous here.

private Object synchronousInvoke(
            RequestChannel channel,
            TChannelBufferInputTransport inputTransport,
            TChannelBufferOutputTransport outputTransport,
            TProtocol inputProtocol,
            TProtocol outputProtocol,
            int sequenceId,
            ClientContextChain contextChain,
            Object[] args) throws Exception
    {
        Object results = null;

        // write request
        outputTransport.resetOutputBuffer();
        writeArguments(outputProtocol, sequenceId, args);
        
        ChannelBuffer requestBuffer = outputTransport.getOutputBuffer();

        ClientMessage clientMessage = new ClientMessage(methodMetadata.getServiceFullName(), sequenceId, requestBuffer, name);
        ChannelBuffer responseBuffer = SyncClientHelpers.sendSynchronousTwoWayMessage(channel, clientMessage);

        // read results
        inputTransport.setInputBuffer(responseBuffer);
        waitForResponse(inputProtocol, sequenceId);
        results = readResponse(inputProtocol);

        return results;
    }
Copy the code

First call outputTransport. ResetOutputBuffer (); To clear the channelBuffer and reset some Pointers.

Use writeArguments(outputProtocol, sequenceId, ARgs); To loop the method arguments to the outputTransport held in the outputProtocol, which is held internally in the ChannelBuffer. This section has previously written similar server-side read data, refer to server-side read data

OutputTransport. At this time in the outBuffer already has data, through outputTransport. GetOutputBuffer (); To get the data and build ClientMessage based on it.

Then use SyncClientHelpers. SendSynchronousTwoWayMessage (channel, clientMessage) data sent to the server, get responseBuffer server-side results.

Finally, the server’s buffer is parsed and the result is returned. This part is similar to writing data and will not be discussed much. The parameter server reads the data

We mainly sends the data SyncClientHelpers. SendSynchronousTwoWayMessage (channel, clientMessage); This part of the implementation

SyncClientHelpers:

public static ChannelBuffer sendSynchronousTwoWayMessage(RequestChannel channel, final ClientMessage request)
            throws TException, InterruptedException {
    final ChannelBuffer[] responseHolder = new ChannelBuffer[1];
    final TException[] exceptionHolder = new TException[1];
    final CountDownLatch latch = new CountDownLatch(1);

    responseHolder[0] = null;
    exceptionHolder[0] = null;

    channel.sendAsynchronousRequest(request, false.new RequestChannel.Listener()
    {
        @Override
        public void onRequestSent(a)
        {}@Override
        public void onResponseReceived(ChannelBuffer response)
        {
            responseHolder[0] = response;
            latch.countDown();
        }

        @Override
        public void onChannelError(TException e)
        {
            exceptionHolder[0] = e; latch.countDown(); }}); latch.await();if (exceptionHolder[0] != null) {
        throw exceptionHolder[0];
    }

    return responseHolder[0];
}
Copy the code

To send a request through a channel, you need to send it to a listener. Notice that the onResponseReceived method sets the result of the response and calls latch.countdown () at the same time, await stops blocking and the method returns the result. Let’s just focus on the implementation of sendAsynchronousRequest.

FramedClientChannel:

public void sendAsynchronousRequest(final ClientMessage message,
                                    final boolean oneway,
                                    final Listener listener) throws TException{
    final int sequenceId = message.getSeqid(); // Get the message ID
    Request = new Request(listener);
    // requestMap.put(sequenceId, request); The subsequent response will remove the request based on the message ID
    final Request request = makeRequest(sequenceId, listener, oneway);

    // To send a message, call FramedClientChannel's writeRequest, as mentioned earlier, call netty.channle's write method
    ChannelFuture sendFuture = writeRequest(message);
    queueSendTimeout(request);
}
Copy the code

To this message is sent out, perhaps you may be curious, not to say that sendSynchronousTwoWayMessage will return the result of waiting for the news? Yes, the latch here is very cleverly explained in the processor AbstractClientChannel(the parent of FramedClientChannel)

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    ChannelBuffer response = extractResponse(e.getMessage());

    int sequenceId = extractSequenceId(response);
    onResponseReceived(sequenceId, response);
}
Copy the code

Firstly, the ChannelBuffer is extracted from the server response. Then get the serial number; Continue to see onResponseReceived

private void onResponseReceived(int sequenceId, final ChannelBuffer response) {
    final Request request = requestMap.remove(sequenceId);
    executorService.execute(new Runnable() {
        @Override
        public void run(a) { fireResponseReceivedCallback(request.getListener(), response); }}); }Copy the code

Obtain the previously set Request (internal Listener) from the previous map based on the serial number, and then use the thread to asynchronously execute the task; Keep following

private void fireResponseReceivedCallback(Listener listener, ChannelBuffer response){
    listener.onResponseReceived(response);
}
Copy the code

See here to perform the listener listener. OnResponseReceived, SyncClientHelpers. At this time sendSynchronousTwoWayMessage end block returns the result.

If you go here and make a remote method call, you send a request to get the result, and you parse the result.