instructions
Java handwritten RPC (01) based socket implementation from scratch
Java hand-written RPC (02) -Netty4 implements both client and server side from scratch
After writing the client and server, how to implement the client and server invocation?
Let’s take a look.
The interface definition
Calculation method
package com.github.houbb.rpc.common.service;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
/** * <p> Computing Services Interface </p> ** <pre> Created: 2018/8/24 PM </pre> * <pre> Project: fake </pre> **@author houbinbin
* @since0.0.1 * /
public interface Calculator {
/** * computes addition *@paramRequest Request entry parameter *@returnReturns the result */
CalculateResponse sum(final CalculateRequest request);
}
Copy the code
pojo
Corresponding parameter object:
- CalculateRequest
package com.github.houbb.rpc.common.model;
import java.io.Serializable;
/ * * * < p > the request into the parameter < / p > * * < pre > Created: 2018/8/24 5:05 PM < / pre > * < pre > Project: fake < / pre > * *@author houbinbin
* @since0.0.3 * /
public class CalculateRequest implements Serializable {
private static final long serialVersionUID = 6420751004355300996L;
/** ** /
private int one;
/** ** /
private int two;
public CalculateRequest(a) {}public CalculateRequest(int one, int two) {
this.one = one;
this.two = two;
}
//getter setter toString
}
Copy the code
- CalculateResponse
package com.github.houbb.rpc.common.model;
import java.io.Serializable;
/ * * * < p > the request into the parameter < / p > * * < pre > Created: 2018/8/24 5:05 PM < / pre > * < pre > Project: fake < / pre > * *@author houbinbin
* @since0.0.3 * /
public class CalculateResponse implements Serializable {
private static final long serialVersionUID = -1972014736222511341L;
/** * Whether successful */
private boolean success;
/** ** the sum of the two */
private int sum;
public CalculateResponse(a) {}public CalculateResponse(boolean success, int sum) {
this.success = success;
this.sum = sum;
}
//getter setter toString
}
Copy the code
The client
The core part of the
RpcClient needs to add the corresponding Handler as follows:
Bootstrap bootstrap = new Bootstrap();
ChannelFuture channelFuture = bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.SO_KEEPALIVE, true)
.handler(new ChannelInitializer<Channel>(){
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new LoggingHandler(LogLevel.INFO))
.addLast(new CalculateRequestEncoder())
.addLast(new CalculateResponseDecoder())
.addLast(new RpcClientHandler());
}
})
.connect(RpcConstant.ADDRESS, port)
.syncUninterruptibly();
Copy the code
The handler swimlanes in Netty are elegantly designed so that our code can be extended with great flexibility.
Let’s look at the corresponding implementation.
RpcClientHandler
package com.github.houbb.rpc.client.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.client.core.RpcClient;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/** * <p> Client processing class </p> ** <pre> Created: 2019/10/1611:30pm </pre> * <pre> Project: RPC </pre> **@author houbinbin
* @sinceHundreds * /
public class RpcClientHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcClient.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
CalculateRequest request = new CalculateRequest(1.2);
ctx.writeAndFlush(request);
log.info("[Client] request is :{}", request);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
CalculateResponse response = (CalculateResponse)msg;
log.info("[Client] response is :{}", response); }}Copy the code
This is easier. In channelActive we call directly, and the input object is fixed here for simplicity.
ChannelRead0 listens to the corresponding results of the server and generates logs.
CalculateRequestEncoder
The request parameter is an object that netty cannot transfer directly, so we convert it to a basic object:
package com.github.houbb.rpc.client.encoder;
import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/ * * *@author binbin.hou
* @since0.0.3 * /
public class CalculateRequestEncoder extends MessageToByteEncoder<CalculateRequest> {
@Override
protected void encode(ChannelHandlerContext ctx, CalculateRequest msg, ByteBuf out) throws Exception {
int one = msg.getOne();
inttwo = msg.getTwo(); out.writeInt(one); out.writeInt(two); }}Copy the code
CalculateResponseDecoder
The same is true for the server response.
We need to convert the basic type encapsulation into the object that we need.
package com.github.houbb.rpc.client.decoder;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/** * Response parameter decoding *@author binbin.hou
* @since0.0.3 * /
public class CalculateResponseDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
boolean success = in.readBoolean();
int sum = in.readInt();
CalculateResponse response = newCalculateResponse(success, sum); out.add(response); }}Copy the code
The service side
Setup handler class
The processing classes in RpcServer will be tweaked slightly, but everything else will remain the same.
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(workerGroup, bossGroup)
.channel(NioServerSocketChannel.class)
// Prints logs
.handler(new LoggingHandler(LogLevel.INFO))
.childHandler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
ch.pipeline()
.addLast(new CalculateRequestDecoder())
.addLast(new CalculateResponseEncoder())
.addLast(newRpcServerHandler()); }})// This parameter affects connections that have not yet been accepted
.option(ChannelOption.SO_BACKLOG, 128)
// The server will send an ACK packet to determine if the client is still alive after a period of time when the client does not respond.
.childOption(ChannelOption.SO_KEEPALIVE, true);
Copy the code
RpcServerHandler
We started with an empty implementation, so let’s add the corresponding implementation.
package com.github.houbb.rpc.server.handler;
import com.github.houbb.log.integration.core.Log;
import com.github.houbb.log.integration.core.LogFactory;
import com.github.houbb.rpc.common.model.CalculateRequest;
import com.github.houbb.rpc.common.model.CalculateResponse;
import com.github.houbb.rpc.common.service.Calculator;
import com.github.houbb.rpc.server.service.CalculatorService;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/ * * *@author binbin.hou
* @since0.0.1 * /
public class RpcServerHandler extends SimpleChannelInboundHandler {
private static final Log log = LogFactory.getLog(RpcServerHandler.class);
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
final String id = ctx.channel().id().asLongText();
log.info("[Server] channel {} connected " + id);
}
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
final String id = ctx.channel().id().asLongText();
CalculateRequest request = (CalculateRequest)msg;
log.info("[Server] receive channel {} request: {} from ", id, request);
Calculator calculator = new CalculatorService();
CalculateResponse response = calculator.sum(request);
// Write back to the client
ctx.writeAndFlush(response);
log.info("[Server] channel {} response {}", id, response); }}Copy the code
After reading the access to the client, we get the CalculateRequest of the calculated entry parameter, and then call the sum method to get the corresponding CalculateResponse and notify the client of the result.
CalculateRequestDecoder
There is a one-to-one correspondence with the client, and we first convert the basic types passed by Netty to CalculateRequest objects.
package com.github.houbb.rpc.server.decoder;
import com.github.houbb.rpc.common.model.CalculateRequest;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
/** * Request parameter decoding *@author binbin.hou
* @since0.0.3 * /
public class CalculateRequestDecoder extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
int one = in.readInt();
int two = in.readInt();
CalculateRequest request = newCalculateRequest(one, two); out.add(request); }}Copy the code
CalculateResponseEncoder
Here, similar to the client, we need to convert the response to a basic type for network transmission.
package com.github.houbb.rpc.server.encoder;
import com.github.houbb.rpc.common.model.CalculateResponse;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.MessageToByteEncoder;
/ * * *@author binbin.hou
* @since0.0.3 * /
public class CalculateResponseEncoder extends MessageToByteEncoder<CalculateResponse> {
@Override
protected void encode(ChannelHandlerContext ctx, CalculateResponse msg, ByteBuf out) throws Exception {
boolean success = msg.isSuccess();
intresult = msg.getSum(); out.writeBoolean(success); out.writeInt(result); }}Copy the code
CalculatorService
The corresponding implementation class of the server.
public class CalculatorService implements Calculator {
@Override
public CalculateResponse sum(CalculateRequest request) {
int sum = request.getOne()+request.getTwo();
return new CalculateResponse(true, sum); }}Copy the code
test
The service side
Start the server:
new RpcServer().start();
Copy the code
Server startup logs:
[the DEBUG] [the 2021-10-05 11:53:11. 795] [the main] [C.G.H.L.I.C.L ogFactory. SetImplementation] - Logging the initialized using 'class Com. Making. Houbb. Log. Integration. Adaptors. Stdout. StdOutExImpl 'adapter. [INFO] [the 2021-10-05 11:53:11. 807] [Thread - 0] [C.G.H.R.S.C.R pcServer. Run] xml-rpc service to start the service side On October 5, 2021 io.net ty 11:53:13 morning. Handler. Logging. LoggingHandler channelRegistered information: [id: 0 xd399474f] REGISTERED on October 5, 2021 io.net ty 11:53:13 morning. Handler. Logging. LoggingHandler bind information: [id: 0 xd399474f] bind: 0.0.0.0/0.0.0.0:05, 9527 October 2021 io.net ty 11:53:13 morning. Handler. Logging. LoggingHandler channelActive information: [id: 0xd399474f, L: / 0:0:0:0:0:0: ACTIVE 0:0:9 527] [INFO] [11:53:13 2021-10-05. 101] [Thread - 0] [C.G.H.R.S.C.R pcServer. Run] xml-rpc server startup is complete, Listen on port [9527]Copy the code
The client
Start the client:
new RpcClient().start();
Copy the code
The log is as follows:
[the DEBUG] [11:54:12 2021-10-05. 158] [the main] [C.G.H.L.I.C.L ogFactory. SetImplementation] - Logging the initialized using 'class Com. Making. Houbb. Log. Integration. Adaptors. Stdout. StdOutExImpl 'adapter. [INFO] [11:54:12 2021-10-05. 164] [Thread - 0] [C.G.H.R.C.C.R pcClient. Run] xml-rpc service to start the client On October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelRegistered information: [id: 0 x4d75c580] REGISTERED on October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler connect information: [id: 0 x4d75c580] CONNECT: / 127.0.0.1:05, 9527 October 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelActive information: [id: 0x4d75c580, L: / 127.0.0.1:54030 R: / 127.0.0.1:9527] the ACTIVE [INFO] [the 2021-10-05 11:54:13. 403] [Thread - 0] [C.G.H.R.C.C.R pcClient. Run] - RPC service start client completes, listen on port: 9527 October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler write information: [id: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527] WRITE: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527 8B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 00000000 00 00 00 00 00 00 02 01 |... | + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 5, October 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler flush information: [id: 0x4d75c580, L:/ 127.0.0.1:54030-r :/127.0.0.1:9527] FLUSH [INFO] [2021-10-05 11:54:13.450] [nioEventLoopgroup-2-1] [C.G.H.R.C.C.R pcClient. ChannelActive] - [Client] request is: CalculateRequest {one = 1, two = 2} on October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelRead information: [id: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527] READ: 0x4D75C580, L:/127.0.0.1:54030 -r :/127.0.0.1:9527] 5B +-------------------------------------------------+ | 0 1 2 3 4 5 6 7 8 9 a b c d e f | + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | | 00000000 01 00 00 00 03 |... | + -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- - + 5, October 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelReadComplete information: [id: 0x4d75c580, L:/ 127.0.0.1:54030-r :/127.0.0.1:9527] READ COMPLETE [INFO] [2021-10-05 11:54:13.508] [nioEventLoopgroup-2-1] [c.g.h.r.c.c.RpcClient.channelRead0] - [Client] response is :CalculateResponse{success=true, sum=3}Copy the code
As you can see, the corresponding request parameters and response results are printed out.
Of course, the server also has the corresponding new log:
October 5, 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelRead information: [id: 0xd399474f, L:/0:0:0:0:0:0:0:0:9527] READ: [id: 0xbc9F5927, L:/127.0.0.1:9527 -r :/127.0.0.1:54030] 2021 io.net ty 11:54:13 morning. Handler. Logging. LoggingHandler channelReadComplete information: [id: 0xd399474f, L:/ 0:0:0:0:0:0:0:0:9:927] READ COMPLETE [INFO] [2021-10-05 11:54:13.432] [nioEventLoopgroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelActive] - [Server] channel {} connected 00e04cffFE360988-00001d34-00000001-2a80D950D8166c0C-bc9F5927 [INFO] [2021-10-05 11:54:13.495] [nioEventLoopgroup-2-1] [c.g.h.r.s.h.RpcServerHandler.channelRead0] - [Server] receive channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 request: CalculateRequest{one=1, Two = 2} the from [INFO] [the 2021-10-05 11:54:13. 505] [nioEventLoopGroup - 2-1] [C.G.H.R.S.H.R pcServerHandler. ChannelRead0] - [Server] channel 00e04cfffe360988-00001d34-00000001-2a80d950d8166c0c-bc9f5927 response CalculateResponse{success=true, sum=3}Copy the code
summary
In order to facilitate learning, the above source code has been open source:
github.com/houbb/rpc
I hope this article is helpful to you. If you like it, please click to collect and forward a wave.
I am an old horse, looking forward to meeting with you next time.