Overview of this article to achieve a simple RPC communication framework, roughly the core process:

Implementation of client-side proxy class processing logic: InvocationHandler

Scan the proxied interface, generate the proxy class, and inject it into the Spring container

Finds the specified implementation class, based on the interface called, and completes the call.

Scan component startup class:

@enablenetTyrpcclient (basePackages = {” com.Net tyRpc”}) public class NettyRpcSpringBootApplication {

public static void main(String[] args) {
    SpringApplication.run(NettyRpcSpringBootApplication.class);
}
Copy the code

} Custom scan annotations EnableNettyRpcClient:

@Retention(RetentionPolicy.RUNTIME) @Target(ElementType.TYPE) @Documented @Import(NettyRpcClientRegistrar.class) public @interface EnableNettyRpcClient {

String[] basePackages() default {}; String[] basePackages() {}; Class<? >[] basePackageClasses() default {};Copy the code

NettyRpcClientRegistrar:

public class NettyRpcClientRegistrar implements ImportBeanDefinitionRegistrar, BeanClassLoaderAware {

private ClassLoader classLoader; @Override public void setBeanClassLoader(ClassLoader classLoader) { this.classLoader = classLoader; } @Override public void registerBeanDefinitions(AnnotationMetadata importingClassMetadata, BeanDefinitionRegistry registry) { ClassPathScanningCandidateComponentProvider scan = getScanner(); // Specify annotation, similar to Feign annotation scan.addincludeFilter (new AnnotationTypeFilter(nettyrpcclient.class)); Set<BeanDefinition> candidateComponents = new HashSet<>(); for (String basePackage : getBasePackages(importingClassMetadata)) { candidateComponents.addAll(scan.findCandidateComponents(basePackage)); } candidateComponents.stream().forEach(beanDefinition -> { if (! registry.containsBeanDefinition(beanDefinition.getBeanClassName())) { if (beanDefinition instanceof AnnotatedBeanDefinition) { AnnotatedBeanDefinition annotatedBeanDefinition = (AnnotatedBeanDefinition) beanDefinition; AnnotationMetadata annotationMetadata = annotatedBeanDefinition.getMetadata(); Assert.isTrue(annotationMetadata.isInterface(), "@NettyRpcClient can only be specified on an interface"); Map<String, Object> attributes = annotationMetadata.getAnnotationAttributes(NettyRpcClient.class.getCanonicalName()); this.registerNettyRpcClient(registry, annotationMetadata,attributes); }}}); } private void registerNettyRpcClient(BeanDefinitionRegistry registry, AnnotationMetadata annotationMetadata, Map<String, Object> attributes) { String className = annotationMetadata.getClassName(); BeanDefinitionBuilder definition = BeanDefinitionBuilder .genericBeanDefinition(NettyRpcClientFactoryBean.class); definition.setAutowireMode(AbstractBeanDefinition.AUTOWIRE_BY_TYPE); definition.addPropertyValue("type", className); String name = attributes.get("name") == null ? "" :(String)(attributes.get("name")); String alias = name + "NettyRpcClient"; AbstractBeanDefinition beanDefinition = definition.getBeanDefinition(); beanDefinition.setPrimary(true); BeanDefinitionHolder holder = new BeanDefinitionHolder(beanDefinition, className, new String[] { alias }); BeanDefinitionReaderUtils.registerBeanDefinition(holder, registry); } protected ClassPathScanningCandidateComponentProvider getScanner() { return new ClassPathScanningCandidateComponentProvider(false) { @Override protected boolean isCandidateComponent( AnnotatedBeanDefinition beanDefinition) {if (beanDefinition for getMetadata () isIndependent ()) {/ / whether the interface inherited the Annotation Annotation if (beanDefinition.getMetadata().isInterface() && beanDefinition.getMetadata() .getInterfaceNames().length == 1 && Annotation.class.getName().equals(beanDefinition.getMetadata().getInterfaceNames()[0])) { try { Class<? > target = ClassUtils.forName(beanDefinition.getMetadata().getClassName(), NettyRpcClientRegistrar.this.classLoader); return ! target.isAnnotation(); } catch (Exception ex) { this.logger.error( "Could not load target class: " + beanDefinition.getMetadata().getClassName(), ex); } } return true; } return false; }}; } protected Set<String> getBasePackages(AnnotationMetadata importingClassMetadata) { Map<String, Object> attributes = importingClassMetadata .getAnnotationAttributes(EnableNettyRpcClient.class.getCanonicalName()); Set<String> basePackages = new HashSet<>(); for (String pkg : (String[]) attributes.get("basePackages")) { if (StringUtils.hasText(pkg)) { basePackages.add(pkg); } } for (Class<? > clazz : (Class[]) attributes.get("basePackageClasses")) { basePackages.add(ClassUtils.getPackageName(clazz)); } if (basePackages.isEmpty()) { basePackages.add( ClassUtils.getPackageName(importingClassMetadata.getClassName())); } return basePackages; }Copy the code

} the scanning agent factory class: NettyRpcClientFactoryBean

@Data @EqualsAndHashCode(callSuper = false) public class NettyRpcClientFactoryBean implements FactoryBean{

private Class<? > type; @Override public Object getObject() throws Exception { return Proxy.newProxyInstance(type.getClassLoader(), new Class[]{type}, new NettyRpcInvocationHandler(type)); } @Override public Class<? > getObjectType() { return this.type; }Copy the code

} request interceptor implementation class: NettyRpcInvocationHandler

public class NettyRpcInvocationHandler implements InvocationHandler {

private Class<? > type; public NettyRpcInvocationHandler(Class<? > type){ this.type = type; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {// Construct call information RpcInfo RpcInfo = new RpcInfo(); rpcInfo.setClassName(type.getName()); rpcInfo.setMethodName(method.getName()); rpcInfo.setParamTypes(method.getParameterTypes()); rpcInfo.setParams(args); NioEventLoopGroup = new NioEventLoopGroup(); Bootstrap bootstrap = new Bootstrap(); RPCClientHandler rpcClientHandler = new RPCClientHandler(); try { bootstrap.group(group) .channel(NioSocketChannel.class) .option(ChannelOption.SO_KEEPALIVE, true) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); / / deserialize objects specified class parser, null said to use the default class loader ch. Pipeline () addLast (new ObjectDecoder (1024 * 64, ClassResolvers cacheDisabled (null)));  ch.pipeline().addLast(rpcClientHandler); }}); ChannelFuture Future = bootstrap.connect("127.0.0.1", 80).sync(); ChannelFuture = bootstrap.connect("127.0.0.1", 80). Future.channel ().writeAndFlush(rpcInfo).sync(); Future.channel ().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); } finally { group.shutdownGracefully(); } / / returning the results return rpcClientHandler. GetRpcResult (); }Copy the code

} Client Netty client ChannelHandler: : RPCClientHandler

public class RPCClientHandler extends ChannelHandlerAdapter {

/** * return result of RPC call */ private Object rpcResult; public Object getRpcResult() { return rpcResult; } public void setRpcResult(Object rpcResult) { this.rpcResult = rpcResult; } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { setRpcResult(msg); ctx.close(); }Copy the code

} request entity RpcInfo:

@Data public class RpcInfo implements Serializable {

/** * private String className; /** * private String methodName; /** * Private Class[] paramTypes; /** * private Object[] params;Copy the code

} Server NettyServer:

public class NettyRpcServer {

public static void main(String[] args){ NioEventLoopGroup boss = new NioEventLoopGroup(); NioEventLoopGroup worker = new NioEventLoopGroup(); ServerBootstrap bootstrap = new ServerBootstrap(); try { bootstrap.group(boss, worker) .channel(NioServerSocketChannel.class) .option(ChannelOption.SO_BACKLOG, 128) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new ObjectEncoder()); ch.pipeline().addLast(new ObjectDecoder(1024 * 64, ClassResolvers.cacheDisabled(null))); ch.pipeline().addLast(new NettyRpcServerHandler()); }}); ChannelFuture Future = bootstrap.bind(80).sync(); ChannelFuture = bootstrap.bind(80).sync(); future.channel().closeFuture().sync(); } catch (Exception e) { e.printStackTrace(); }finally { boss.shutdownGracefully(); worker.shutdownGracefully(); }}Copy the code

} Netty server ChannelHandler:

public class NettyRpcServerHandler extends ChannelHandlerAdapter {

@override public void channelRead(ChannelHandlerContext CTX, Object MSG) throws Exception { RpcInfo = (RpcInfo) MSG; String implName = getImplClassName(rpcInfo.getClassName()); Class<? > clazz = Class.forName(implName); Method method = clazz.getMethod(rpcInfo.getMethodName(), rpcInfo.getParamTypes()); Object result = method.invoke(clazz.newInstance(), rpcInfo.getParams()); ctx.writeAndFlush(result); } private String getImplClassName(String interfaceName) throws ClassNotFoundException { Class interClass = Class.forName(interfaceName); String servicePath = "com.nettyRpc.server"; Reflections reflections = new Reflections(servicePath); Set<Class> implClasses = reflections.getSubTypesOf(interClass); if (implClasses.isEmpty()) { System.err.println("impl class is not found!" ); } else if (implClasses.size() > 1) { System.err.println("there are many impl classes, not sure invoke which"); } else { Class[] classes = implClasses.toArray(new Class[1]); return classes[0].getName(); } return null; }Copy the code

} request sample proxy client:

@NettyRpcClient public interface UserService { String callRpc(String param); } Server implementation class:

public class UserServiceImpl implements UserService {

@Override
public String callRpc(String param) {
    System.out.println(param);
    return param;
}
Copy the code

} remote call:

@RestController public class UserController {

@Autowired UserService userService; @RequestMapping(value = "/callRpc") public String callRpcTest(){ userService.callRpc("callRpc execute......" ); return "ok"; }Copy the code

}

Project address: github.com/admin801122…