Create client with remote call
FramedClientConnector connector = new FramedClientConnector(new InetSocketAddress(8081));
ThriftClientManager manager = new ThriftClientManager(new ThriftCodecManager(), new NiftyClient(), ImmutableSet.of());
EchoService.Iface client = manager.createClient(connector, EchoService.Iface.class).get();
String answer = client.echo("abc".new OperationActivityRequest().setKeywords("htae"));
System.out.println(answer);
Copy the code
Create a client to get the proxy class EchoService.iface, which can then be used as a local method to make remote calls; Seeing the core component in the code is
- FramedClientConnector: Relating to connections, including protocols, channels, etc.
- NiftyClient: Related to the client
- ThriftClientManager: Relates to client management
Quick understanding of important components
This section will briefly look at the functions of a few components, and the details will be discussed when analyzing the creation of the client
FramedClientConnector
The client connector is a SocketAddress and TDuplexProtocolFactory. Three important approaches are provided
- NewThriftClientChannel: Obtains the FramedClientChannel, through which the request request can be written out.
- NewChannelPipelineFactory: obtain the netty pipeline, used for subsequent creation netty client;
- Connect: Connects the Netty client to the server
The top-level interface is as follows:
public interface NiftyClientConnector<T extends RequestChannel> {
ChannelFuture connect(ClientBootstrap bootstrap);
T newThriftClientChannel(Channel channel, NettyClientConfig clientConfig);
ChannelPipelineFactory newChannelPipelineFactory(int maxFrameSize, NettyClientConfig clientConfig);
}
Copy the code
As you can see, the last two methods are related to Netty. The first method encapsulates the Netty channel and writes the request. In addition, FramedClientChannel inherits from AbstractClientChannel, which is very important as a Netty processor, more on this later.
NiftyClient
The name of the class is Nifty client related. Its internal properties are mainly netty related parameters, boss thread pool, worker thread pool, channel thread group ChannelGroup, Netty configuration class NettyClientConfig, And NioClientSocketChannelFactory, first to see constructor
public NiftyClient(a){
this(NettyClientConfig.newBuilder().build());
}
public NiftyClient(NettyClientConfig nettyClientConfig)
{
this.nettyClientConfig = nettyClientConfig;
this.timer = nettyClientConfig.getTimer();
this.bossExecutor = nettyClientConfig.getBossExecutor();
this.workerExecutor = nettyClientConfig.getWorkerExecutor();
this.defaultSocksProxyAddress = nettyClientConfig.getDefaultSocksProxyAddress();
int bossThreadCount = nettyClientConfig.getBossThreadCount();
int workerThreadCount = nettyClientConfig.getWorkerThreadCount();
NioWorkerPool workerPool = new NioWorkerPool(workerExecutor, workerThreadCount, ThreadNameDeterminer.CURRENT);
NioClientBossPool bossPool = new NioClientBossPool(bossExecutor, bossThreadCount, timer, ThreadNameDeterminer.CURRENT);
this.channelFactory = new NioClientSocketChannelFactory(bossPool, workerPool);
}
Copy the code
From the argument constructor, we get the properties assigned to the object by the Netty related parameters from the Netty configuration class. Then create the netty client key components.
If the method is an empty parameter, the default netty configuration parameter is used to take a look at what the build method does.
public NettyClientConfig build(a) {
Timer timer = getTimer();
ExecutorService bossExecutor = getBossExecutor();
int bossThreadCount = getBossThreadCount();
ExecutorService workerExecutor = getWorkerExecutor();
int workerThreadCount = getWorkerThreadCount();
return newNettyClientConfig( getBootstrapOptions(), defaultSocksProxyAddress, timer ! =null ? timer : new NiftyTimer(threadNamePattern("")), bossExecutor ! =null? bossExecutor : buildDefaultBossExecutor(), bossThreadCount, workerExecutor ! =null ? workerExecutor : buildDefaultWorkerExecutor(),
workerThreadCount
);
}
Copy the code
The boss pool thread count is 1, the worker pool thread count is 2, and all other parameters are null. So the Timer and two thread pools are rebuilt when NettyClientConfig is built. The thread pool method newCachedThreadPool uses the custom Thread pool factory (Guava provides ThreadFactoryBuilder) to return a suitable name for the thread pool using the Executors tool class.
private ExecutorService buildDefaultBossExecutor(a){
return newCachedThreadPool(renamingDaemonThreadFactory(threadNamePattern("-boss-%s")));
}
private ExecutorService buildDefaultWorkerExecutor(a) {
return newCachedThreadPool(renamingDaemonThreadFactory(threadNamePattern("-worker-%s")));
}
private ThreadFactory renamingDaemonThreadFactory(String nameFormat) {
return new ThreadFactoryBuilder().setNameFormat(nameFormat).setDaemon(true).build();
}
Copy the code
NiftyClient mainly provides one function — to get connections, including asynchronous and synchronous connections. The FramedClientChannel mentioned above is actually created by FramedClientConnector, but put into the Future.
ThriftClientManager
The name looks like the client manager, the main internal holding codec manager and NiftyClient two properties.
private final ThriftCodecManager codecManager;
private final NiftyClient niftyClient;
Copy the code
This is set at construction time, and most of the time you just need to write New ThriftClientManager(), which is created by default. This class provides the createClient method to create a client (proxy object) that can be used to make remote method calls.
Client creation and data sending process analysis
We’ll start with manager.createclient (connector, echoService.iface. Class). After setting some default timeout parameters, we’ll simplify a bit
public <T, C extends NiftyClientChannel> ListenableFuture<T> createClient(
final NiftyClientConnector<C> connector,
final Class<T> type,
@Nullable final Duration connectTimeout,
@Nullable final Duration receiveTimeout,
@Nullable final Duration readTimeout,
@Nullable final Duration writeTimeout,
final int maxFrameSize,
@Nullable final String clientName,
final List<? extends ThriftClientEventHandler> eventHandlers,
@Nullable InetSocketAddress socksProxy)
{
Future
final ListenableFuture<C> connectFuture = niftyClient.connectAsync(
connector,
connectTimeout,
receiveTimeout,
readTimeout,
writeTimeout,
maxFrameSize,
socksProxy);
// (2) Obtain the client proxy object after conversion
ListenableFuture<T> clientFuture = Futures.transform(connectFuture, new Function<C, T>() {
@Override
public T apply(@NotNull C channel) {
String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName;
return createClient(channel, type, name, eventHandlers);
}
}, Runnable::run);
return clientFuture;
}
Copy the code
There are two main steps, obtaining the FramerClientChannel followed by obtaining the client proxy object.
Let’s start with part 1: How to get a Future
ClientBootstrap bootstrap = new ClientBootstrap(channelFactory);
bootstrap.setOptions(nettyClientConfig.getBootstrapOptions());
bootstrap.setPipelineFactory(clientChannelConnector.newChannelPipelineFactory(maxFrameSize, nettyClientConfig));
ChannelFuture nettyChannelFuture = clientChannelConnector.connect(bootstrap);
Copy the code
The first part of the netty component setup, let’s look at clientChannelConnector(FrameClientConnector) provides two methods NiftyClient:
public ChannelFuture connect(ClientBootstrap bootstrap){
return bootstrap.connect(address);
}
@Override
public ChannelPipelineFactory newChannelPipelineFactory(final int maxFrameSize, final NettyClientConfig clientConfig)
{
return new ChannelPipelineFactory() {
@Override
public ChannelPipeline getPipeline(a)
throws Exception {
ChannelPipeline cp = Channels.pipeline();
TimeoutHandler.addToPipeline(cp);
cp.addLast("frameEncoder".new LengthFieldPrepender(LENGTH_FIELD_LENGTH));
cp.addLast(
"frameDecoder".new LengthFieldBasedFrameDecoder(
maxFrameSize,
LENGTH_FIELD_OFFSET,
LENGTH_FIELD_LENGTH,
LENGTH_ADJUSTMENT,
INITIAL_BYTES_TO_STRIP));
cp.addLast("clientMessage".new ClientMessageHandler());
if(clientHeader ! =null) {
clientHeader.createHandler(cp);
}
returncp; }}; }Copy the code
The connect method is provided by Netty. NewChannelPipelineFactory is build netty pipelie factory, will add the various internal processor, temporarily don’t go too much explanation.
Keep reading
nettyChannelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
Channel channel = future.getChannel();
if(channel ! =null&& channel.isOpen()) { allChannels.add(channel); }}});return new TNiftyFuture<>(clientChannelConnector,
receiveTimeout,
readTimeout,
sendTimeout,
nettyChannelFuture);
Copy the code
Add a listener to the channelGroup. And then create TNiftyFuture, this is the key step, so click on it
private class TNiftyFuture<T extends NiftyClientChannel> extends AbstractFuture<T> {
private TNiftyFuture(final NiftyClientConnector<T> clientChannelConnector,
@Nullable final Duration receiveTimeout,
@Nullable final Duration readTimeout,
@Nullable final Duration sendTimeout,
final ChannelFuture channelFuture)
{
channelFuture.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if(future.isSuccess()) { Channel nettyChannel = future.getChannel(); T channel = clientChannelConnector.newThriftClientChannel(nettyChannel, nettyClientConfig); channel.setReceiveTimeout(receiveTimeout); channel.setReadTimeout(readTimeout); channel.setSendTimeout(sendTimeout); set(channel); }}}); }}Copy the code
After the connection is established, a listener is added to the constructor. After the connection is established, a channel is acquired, through which a ThriftChannel, in this case FramedClientChannel, is created, and set to the future.
Take a look at the final method offered by clientChannelConnector, newThriftClientChannel
public FramedClientChannel newThriftClientChannel(Channel nettyChannel, NettyClientConfig clientConfig){
FramedClientChannel channel = new FramedClientChannel(nettyChannel, clientConfig.getTimer(), getProtocolFactory());
ChannelPipeline cp = nettyChannel.getPipeline();
cp.addLast("thriftHandler", channel);
return channel;
}
Copy the code
Build the nettyChannel, build the FramedClientChannel, and add it to the pipeline, so FramedClientChannel must also be a processor.
FramedClientChannel is important because some of the channel’s operations, such as receiving requests for processing, sending requests, etc., are derived from AbstractClientChannel (processor).
Now I have got thriftChannel, that is, step (1) has been introduced. Look at (2) creating a client proxy object.
// (2) Obtain the client proxy object after conversion
ListenableFuture<T> clientFuture = Futures.transform(connectFuture, new Function<C, T>() {
@Override
public T apply(@NotNull C channel) {
String name = Strings.isNullOrEmpty(clientName) ? connector.toString() : clientName;
return createClient(channel, type, name, eventHandlers);
}
}, Runnable::run);
Copy the code
The Transform method is provided by Guava to perform future conversions, converting a Future to a Future, in this case converting a FrameClientChannel to EchoService.iface (which is actually a proxy object). The createClient method:
ThriftClientManager:
private final LoadingCache<TypeAndName, ThriftClientMetadata> clientMetadataCache = CacheBuilder.newBuilder()
.build(new CacheLoader<TypeAndName, ThriftClientMetadata>()
{
@Override
public ThriftClientMetadata load(TypeAndName typeAndName)
throws Exception
{
return newThriftClientMetadata(typeAndName.getType(), typeAndName.getName(), codecManager); }});public <T> T createClient(RequestChannel channel, Class<T> type, String name, List<? extends ThriftClientEventHandler> eventHandlers) {
ThriftClientMetadata clientMetadata = clientMetadataCache.getUnchecked(new TypeAndName(type, name));
String clientDescription = clientMetadata.getName() + "" + channel.toString();
ThriftInvocationHandler handler = new ThriftInvocationHandler(clientDescription, channel,
clientMetadata.getMethodHandlers(),
ImmutableList.<ThriftClientEventHandler>builder().addAll(globalEventHandlers).addAll(eventHandlers).build());
return type.cast(Proxy.newProxyInstance(
type.getClassLoader(),
newClass<? >[]{ type, Closeable.class }, handler )); }Copy the code
ClientMetadataCache is a local cache created using guava Cache. The key is TypeAndName and value is ThriftClientMetadata. ThriftClientMetadata is similar to ThriftServiceMetadata. It contains ThriftServiceMetadata, Type, and name. Private final Map
methodHandlers; These are all initialized when the object is created.
The rest is to create the proxy object using the dynamic proxy, so you can guess that ThriftInvocationHandler inherits from InvocationHandler.
The method call
private static class ThriftInvocationHandler implements InvocationHandler
{
private static final Object[] NO_ARGS = new Object[0];
private final String clientDescription;
private final RequestChannel channel;
private final Map<Method, ThriftMethodHandler> methods;
private static final AtomicInteger sequenceIdCursor = new AtomicInteger(1);
private final List<? extends ThriftClientEventHandler> eventHandlers;
private ThriftInvocationHandler( String clientDescription, RequestChannel channel, Map
methods, List
eventHandlers)
,>
{
this.clientDescription = clientDescription;
this.channel = channel;
this.methods = methods;
this.eventHandlers = eventHandlers;
}
public RequestChannel getChannel(a){
return channel;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
int sequenceId = sequenceIdCursor.getAndIncrement();
TChannelBufferInputTransport inputTransport = new TChannelBufferInputTransport();
TChannelBufferOutputTransport outputTransport = new TChannelBufferOutputTransport();
TTransportPair transportPair = fromSeparateTransports(inputTransport, outputTransport);
TProtocolPair protocolPair = channel.getProtocolFactory().getProtocolPair(transportPair);
TProtocol inputProtocol = protocolPair.getInputProtocol();
TProtocol outputProtocol = protocolPair.getOutputProtocol();
ThriftMethodHandler methodHandler = methods.get(method);
NiftyClientChannel niftyClientChannel = (NiftyClientChannel)channel;
SocketAddress remoteAddress = niftyClientChannel.getNettyChannel().getRemoteAddress();
ClientRequestContext requestContext = new NiftyClientRequestContext(inputProtocol, outputProtocol, channel, remoteAddress);
ClientContextChain context = new ClientContextChain(eventHandlers, methodHandler.getQualifiedName(), requestContext);
returnmethodHandler.invoke(channel,inputTransport inputTransport, outputTransport, inputProtocol, outputProtocol, sequenceId, context, args); }}Copy the code
When a method call is made, it goes to the Invoke method, as anyone who knows Java reflection knows. The Invoke method is basically three steps
- Create the input and output protocol, here is the TBinaryProtocal, held by the internal transport TChannelBufferInputTransport and TChannelBufferOutputTransport, can be understood as the transport layer components, Internal hold ChannelBuffer to store data;
- Get ThriftMethodHandler according to Method;
- Get the address, call ThriftMethodHandler. Invoke method;
Methodhandler. invoke keeps going and discovers that there are synchronous and asynchronous calls, and we’ll be going synchronous here.
private Object synchronousInvoke(
RequestChannel channel,
TChannelBufferInputTransport inputTransport,
TChannelBufferOutputTransport outputTransport,
TProtocol inputProtocol,
TProtocol outputProtocol,
int sequenceId,
ClientContextChain contextChain,
Object[] args) throws Exception
{
Object results = null;
// write request
outputTransport.resetOutputBuffer();
writeArguments(outputProtocol, sequenceId, args);
ChannelBuffer requestBuffer = outputTransport.getOutputBuffer();
ClientMessage clientMessage = new ClientMessage(methodMetadata.getServiceFullName(), sequenceId, requestBuffer, name);
ChannelBuffer responseBuffer = SyncClientHelpers.sendSynchronousTwoWayMessage(channel, clientMessage);
// read results
inputTransport.setInputBuffer(responseBuffer);
waitForResponse(inputProtocol, sequenceId);
results = readResponse(inputProtocol);
return results;
}
Copy the code
First call outputTransport. ResetOutputBuffer (); To clear the channelBuffer and reset some Pointers.
Use writeArguments(outputProtocol, sequenceId, ARgs); To loop the method arguments to the outputTransport held in the outputProtocol, which is held internally in the ChannelBuffer. This section has previously written similar server-side read data, refer to server-side read data
OutputTransport. At this time in the outBuffer already has data, through outputTransport. GetOutputBuffer (); To get the data and build ClientMessage based on it.
Then use SyncClientHelpers. SendSynchronousTwoWayMessage (channel, clientMessage) data sent to the server, get responseBuffer server-side results.
Finally, the server’s buffer is parsed and the result is returned. This part is similar to writing data and will not be discussed much. The parameter server reads the data
We mainly sends the data SyncClientHelpers. SendSynchronousTwoWayMessage (channel, clientMessage); This part of the implementation
SyncClientHelpers:
public static ChannelBuffer sendSynchronousTwoWayMessage(RequestChannel channel, final ClientMessage request)
throws TException, InterruptedException {
final ChannelBuffer[] responseHolder = new ChannelBuffer[1];
final TException[] exceptionHolder = new TException[1];
final CountDownLatch latch = new CountDownLatch(1);
responseHolder[0] = null;
exceptionHolder[0] = null;
channel.sendAsynchronousRequest(request, false.new RequestChannel.Listener()
{
@Override
public void onRequestSent(a)
{}@Override
public void onResponseReceived(ChannelBuffer response)
{
responseHolder[0] = response;
latch.countDown();
}
@Override
public void onChannelError(TException e)
{
exceptionHolder[0] = e; latch.countDown(); }}); latch.await();if (exceptionHolder[0] != null) {
throw exceptionHolder[0];
}
return responseHolder[0];
}
Copy the code
To send a request through a channel, you need to send it to a listener. Notice that the onResponseReceived method sets the result of the response and calls latch.countdown () at the same time, await stops blocking and the method returns the result. Let’s just focus on the implementation of sendAsynchronousRequest.
FramedClientChannel:
public void sendAsynchronousRequest(final ClientMessage message,
final boolean oneway,
final Listener listener) throws TException{
final int sequenceId = message.getSeqid(); // Get the message ID
Request = new Request(listener);
// requestMap.put(sequenceId, request); The subsequent response will remove the request based on the message ID
final Request request = makeRequest(sequenceId, listener, oneway);
// To send a message, call FramedClientChannel's writeRequest, as mentioned earlier, call netty.channle's write method
ChannelFuture sendFuture = writeRequest(message);
queueSendTimeout(request);
}
Copy the code
To this message is sent out, perhaps you may be curious, not to say that sendSynchronousTwoWayMessage will return the result of waiting for the news? Yes, the latch here is very cleverly explained in the processor AbstractClientChannel(the parent of FramedClientChannel)
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
ChannelBuffer response = extractResponse(e.getMessage());
int sequenceId = extractSequenceId(response);
onResponseReceived(sequenceId, response);
}
Copy the code
Firstly, the ChannelBuffer is extracted from the server response. Then get the serial number; Continue to see onResponseReceived
private void onResponseReceived(int sequenceId, final ChannelBuffer response) {
final Request request = requestMap.remove(sequenceId);
executorService.execute(new Runnable() {
@Override
public void run(a) { fireResponseReceivedCallback(request.getListener(), response); }}); }Copy the code
Obtain the previously set Request (internal Listener) from the previous map based on the serial number, and then use the thread to asynchronously execute the task; Keep following
private void fireResponseReceivedCallback(Listener listener, ChannelBuffer response){
listener.onResponseReceived(response);
}
Copy the code
See here to perform the listener listener. OnResponseReceived, SyncClientHelpers. At this time sendSynchronousTwoWayMessage end block returns the result.
If you go here and make a remote method call, you send a request to get the result, and you parse the result.