The target
-
Introduction to Apache Dubbo plug-in
- Introduction to Metadata
-
Configure the Apache Dubbo plug-in
- The Bootstrap pom configuration
- Soul – admin configuration
- Pom configuration for dubbo service
-
Introduction to Apache Dubbo generalization calls
- Use a generalization call through an API
- Use generalized calls through Spring
- Generalize the call implementation process
-
The Soul Dubbo plug-in calls parsing
- ApachDubboPlugin generalization call ready
- ApacheDubboProxySerivce
- DubboResponsePlugin
- WebFluxResultUtils returns the result
-
Introduction to Dubbo generalization calls
-
conclusion
-
reference
Introduction to Apache Dubbo plug-in
Apache Dubbo is a high-performance, lightweight, open source Java service framework that provides six core capabilities: high-performance RPC calls for interface proxies, intelligent fault tolerance and load balancing, automatic service registration and discovery, highly scalable capabilities, run-time traffic scheduling, and visual service governance and operation. The Dubbo plug-in in the gateway is mainly used to convert Http protocol into Dubbo protocol, which is also the key of the gateway to implement the Dubbo generalization call. The Dubbo plug-in requires metadata to make Dubbo calls.
Introduction to Metadata
The metadata is used to get the actual request PATH, methodName, and parameterTypes ready for the generalization call during protocol transformation
-
In the database, we have a form that stores Dubbo metadata alone, and the data from this table is synchronized to the gateway’S JVM memory through the data synchronization scheme
-
The table structure is as follows
CREATE TABLE IF NOT EXISTS `meta_data` ( `id` varchar(128) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'id', `app_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'Application Name', `path` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'Path, cannot be repeated', `path_desc` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'Path description', `rpc_type` varchar(64) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NOT NULL COMMENT 'the RPC types', `service_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'Service Name', `method_name` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'Method name', `parameter_types` varchar(255) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'Parameter type Multiple parameter types separated by commas', `rpc_ext` varchar(1024) CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci NULL DEFAULT NULL COMMENT 'Extended RPC information, JSON format', `date_created` datetime(0) NOT NULL COMMENT 'Creation time', `date_updated` datetime(0) NOT NULL ON UPDATE CURRENT_TIMESTAMP(0) COMMENT 'Update Time', `enabled` tinyint(4) NOT NULL DEFAULT 0 COMMENT 'Enabled status'.PRIMARY KEY (`id`) USING BTREE ) ENGINE = InnoDB CHARACTER SET = utf8mb4 COLLATE = utf8mb4_unicode_ci ROW_FORMAT = Dynamic; Copy the code
path
The field is mainly when requesting the gateway and will be based on yourpath
Field to match a piece of data, and then proceed to the subsequent processing flowrpc_ext
Field if the agent’s interface isDubbo
Type of the service interfacegroup
version
Field, then the information is stored torpc_ext
中- each
Dubbo
The interface method handles a single piece of metadata, compared to SpringCloud and HTTP, which store only /contextPath/** and none, respectively
Configure the Apache Dubbo plug-in
Soul – the bootstrap pom configuration
<dependency> <groupId>org.dromara</groupId> <artifactId>soul-spring-boot-starter-plugin-apache-dubbo</artifactId> <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.dubbo</groupId> <artifactId>dubbo</artifactId> <version>2.7. 5</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-client</artifactId> <version>${curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>${curator.version}</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>${curator.version}</version> </dependency> Copy the code
Soul – admin configuration
Log in to soul-admin and turn on the Dubbo configuration option on the plug-in management page, and fill in the connection address of the registry
Pom configuration for dubbo service
<dependency>
<groupId>org.dromara</groupId>
<artifactId>soul-spring-boot-starter-client-apache-dubbo</artifactId>
<version>${soul.version}</version>
</dependency>
@SoulDubboClient(path = "/insert", desc = "Insert a row of data")
public DubboTest insert(final DubboTest dubboTest) {
dubboTest.setName("hello world Soul Apache Dubbo: " + dubboTest.getName());
return dubboTest;
}
Copy the code
The proxied service uses the soul-spring-boot-starter-client-apache-dubbo client dependencies provided by the proxied service, and uses the @soulDubboClient attribute to register the interface name, parameter type, and parameter content at startup The soul-admin side then synchronizes the data to the bootstrap side.
Introduction to Apache Dubbo generalization calls
The generalized interface invocation method is mainly used when the client does not have API interfaces and model class elements. All POJOs in parameters and return values are represented by Map, which is usually used for framework integration. All service implementations can be invoked through GenericSerivce.
Using generic calls via API (the way gateways are currently used)
ReferenceConfig<GenericService> reference = new ReferenceConfig<>();
reference.setGeneric(true);
reference.setApplication(applicationConfig);
reference.setRegistry(registryConfig);
reference.setInterface(metaData.getServiceName());
reference.setProtocol("dubbo");
Copy the code
The gateway registers the use of generalization calls through API declarations
Use generalized calls through Spring
<dubbo:reference id="barService" interface="com.foo.BarService" generic="true" />
Copy the code
Generalize the call implementation process
+ -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + + -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- -- + | consumer end | | the provider side | | | | | | | | | | | | | | | | | | +------------------+ | | +--------------+ | | |GenericImplFilter | | Invocation | |GenericFilter | | | +----> | +-------------------------> | | | | | +------------------+ | | +--------------+ | | +-----------+ | | | +-----------+ | | | | | | | | | | | |Client | | | +--> | Service | | | | | | | | | | | +-----------+ | | +-------+---+ | | | | | | | ^ +------------------+ | | +--------------+ | | | | |GenericImplFilter | | | |GenericFilter | <----------+ | | +-------------+ | <-------------------------+ | | | +------------------+ | | +--------------+ | | | | | | | | | | | | | | | | | +-------------------------------------------+ +-------------------------------------------+Copy the code
GenericService this interface is very similar to Java reflection call, just provide the method name, parameter type and parameter value can directly call the corresponding method. – GenericFilter: converts the parameters of the hashMap structure to the corresponding POJOs when called – Returns the result that the POJOs are converted to hashMap
– GenericImplFilter: converts parameters on the consumer side and converts POJOs into hashMap interfaces
/**
* Generic service interface
*
* @export* /
public interface GenericService {
/**
* Generic invocation
*
* @paramMethod Specifies the name of the method, for example, findPerson. If there is an overloaded method, bring a list of arguments, for example, findPerson(java.lang.string) *@paramParameterTypes Parameter type *@paramArgs parameter list *@returnInvocation Return value *@throwsAn exception thrown by the GenericException method */
Object $invoke(String method, String[] parameterTypes, Object[] args) throws GenericException;
default CompletableFuture<Object> $invokeAsync(String method, String[] parameterTypes, Object[] args) throws GenericException {
Object object = $invoke(method, parameterTypes, args);
if (object instanceof CompletableFuture) {
return (CompletableFuture<Object>) object;
}
returnCompletableFuture.completedFuture(object); }}Copy the code
The Soul Dubbo plug-in calls parsing
When a business request is initiated, the Handle method of the SoulWebHandler class is first entered and plugins are called from the DefaultSoulPluginChain class.
@Override
public Mono<Void> handle(@NonNull final ServerWebExchange exchange) {
return new DefaultSoulPluginChain(plugins).execute(exchange).subscribeOn(scheduler);
}
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
// Responsive programming
return Mono.defer(() -> {
// Check whether the current index is < the number of plug-ins
if (this.index < plugins.size()) {
// Call one of the plug-ins in turn from the plugins
SoulPlugin plugin = plugins.get(this.index++);
// Check whether the plug-in is not open
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
Copy the code
This chapter focuses only on Apache Dubbo, so we’ll focus on calling the Dubbo plug-in.Through the Debug gateway program, we know that the call is made one by one in the above order. Now let’s focus on
ApacheDubboPlugin
ApachDubboPlugin generalization call ready
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
// Get the dubbo_params data
String body = exchange.getAttribute(Constants.DUBBO_PARAMS);
// Get the exchange Context property value
SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assertsoulContext ! =null;
// Get the Exchange metaData property value
MetaData metaData = exchange.getAttribute(Constants.META_DATA);
// Check whether the metaData is incorrect. If the metaData is incorrect, the system returns an error message
if(! checkMetaData(metaData)) {assertmetaData ! =null;
log.error(" path is :{}, meta data have error.... {}", soulContext.getPath(), metaData.toString());
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.META_DATA_ERROR.getCode(), SoulResultEnum.META_DATA_ERROR.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// Determine whether the parameterTypes and body of metaData are empty. If not, return the body error message
if (StringUtils.isNoneBlank(metaData.getParameterTypes()) && StringUtils.isBlank(body)) {
exchange.getResponse().setStatusCode(HttpStatus.INTERNAL_SERVER_ERROR);
Object error = SoulResultWrap.error(SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getCode(), SoulResultEnum.DUBBO_HAVE_BODY_PARAM.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// Async Dubbo GenericsService with exchange, body, metaData
final Mono<Object> result = dubboProxyService.genericInvoker(body, metaData, exchange);
return result.then(chain.execute(exchange));
}
Copy the code
The parameters required for the generalization call are first checked
ApacheDubboProxyService
public Mono<Object> genericInvoker(final String body, final MetaData metaData, final ServerWebExchange exchange) throws SoulException {
// issue(https://github.com/dromara/soul/issues/471), add dubbo tag route
String dubboTagRouteFromHttpHeaders = exchange.getRequest().getHeaders().getFirst(Constants.DUBBO_TAG_ROUTE);
if (StringUtils.isNotBlank(dubboTagRouteFromHttpHeaders)) {
RpcContext.getContext().setAttachment(CommonConstants.TAG_KEY, dubboTagRouteFromHttpHeaders);
}
// Obtain ferference based on the metaData path
ReferenceConfig<GenericService> reference = ApplicationConfigCache.getInstance().get(metaData.getPath());
if (Objects.isNull(reference) || StringUtils.isEmpty(reference.getInterface())) {
ApplicationConfigCache.getInstance().invalidate(metaData.getPath());
reference = ApplicationConfigCache.getInstance().initRef(metaData);
}
// Obtain the GenericService instance of the generalization call according to ferference
GenericService genericService = reference.get();
Pair<String[], Object[]> pair;
if (ParamCheckUtils.dubboBodyIsEmpty(body)) {
pair = new ImmutablePair<>(new String[]{}, new Object[]{});
} else {
// organizes the parameterTypes and parameter values for Dubbo generalization calls based on body and parameterTypes
pair = dubboParamResolveService.buildParameter(body, metaData.getParameterTypes());
}
// Now use the GenericSerice default method $invokeAsync for the asynchronous invocation
CompletableFuture<Object> future = genericService.$invokeAsync(metaData.getMethodName(), pair.getLeft(), pair.getRight());
return Mono.fromFuture(future.thenApply(ret -> {
if (Objects.isNull(ret)) {
ret = Constants.DUBBO_RPC_RESULT_EMPTY;
}
// Copy the result and type to the property corresponding to exchagne
exchange.getAttributes().put(Constants.DUBBO_RPC_RESULT, ret);
exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.SUCCESS.getName());
return ret;
})).onErrorMap(exception -> exception instanceof GenericException ? new SoulException(((GenericException) exception).getExceptionMessage()) : new SoulException(exception));
}
Copy the code
DubboResponsePlugin
@Overridepublic Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) { return chain.execute(exchange).then(Mono.defer(() -> { final Object result = exchange.getAttribute(Constants.DUBBO_RPC_RESULT); if (Objects.isNull(result)) { Object error = SoulResultWrap.error(SoulResultEnum.SERVICE_RESULT_ERROR.getCode(), SoulResultEnum.SERVICE_RESULT_ERROR.getMsg(), null); return WebFluxResultUtils.result(exchange, error); } Object success = SoulResultWrap.success(SoulResultEnum.SUCCESS.getCode(), SoulResultEnum.SUCCESS.getMsg(), JsonUtils.removeClass(result)); returnWebFluxResultUtils.result(exchange, success); })); }Copy the code
WebFluxResultUtils returns the result
Introduction to Dubbo generalization calls
The Dubbo generalization call is divided into two parts: how the consumer intercepts the generalization call using GenericImplFilter, and how the service provider intercepts the request using GenericImplFilter to serialize the generalization parameters and request the specific service.
Service consumers org. Apache. Dubbo. RPC. Filter. How GenericImplFilter intercept generalization call
@Activate(group = CommonConstants.CONSUMER, value = GENERIC_KEY, order = 20000)
public class GenericImplFilter implements Filter.Filter.Listener {
@Override
public Result invoke(Invoker
invoker, Invocation invocation) throws RpcException {
/ /... Omit non-core code
// Determine if it is a generalization call
if (isMakingGenericCall(generic, invocation)) {
// Get the generalization parameter
Object[] args = (Object[]) invocation.getArguments()[2];
// If generalized to nativeJava
if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (Object arg : args) {
if(! (byte[].class == arg.getClass())) {
error(generic, byte[].class.getName(), arg.getClass().getName()); }}// If the generalization is bean
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (Object arg : args) {
if(! (arginstanceofJavaBeanDescriptor)) { error(generic, JavaBeanDescriptor.class.getName(), arg.getClass().getName()); }}}// Set attachment to be invoked with the server
invocation.setAttachment(
GENERIC_KEY, invoker.getUrl().getParameter(GENERIC_KEY));
}
// Initiate a remote call
return invoker.invoke(invocation);
}
private boolean isMakingGenericCall(String generic, Invocation invocation) {
return(invocation.getMethodName().equals($INVOKE) || invocation.getMethodName().equals($INVOKE_ASYNC)) && invocation.getArguments() ! =null
&& invocation.getArguments().length == 3&& ProtocolUtils.isGeneric(generic); }}Copy the code
GenericImplFilter implements the interface Filter(I won’t describe the Filter in Dubbo) and then executes the Invoke method, which does the following: – Parameter verification, check whether the call is a generalization call – get generalization parameters – Determine the generalization call method: iterate over each parameter, and then determine whether the parameter generalization method is NativeJava or bean method – initiate a remote call
The service provider intercepts the generalization request through GenericFilter
@Activate(group = CommonConstants.PROVIDER, order = -20000)
public class GenericFilter implements Filter.Filter.Listener {
@Override
public Result invoke(Invoker
invoker, Invocation inv) throws RpcException {
// Check parameters
if((inv.getMethodName().equals($INVOKE) || inv.getMethodName().equals($INVOKE_ASYNC)) && inv.getArguments() ! =null
&& inv.getArguments().length == 3
&& !GenericService.class.isAssignableFrom(invoker.getInterface())) {
// Get the parameter name, parameter type, and parameter value
String name = ((String) inv.getArguments()[0]).trim();
String[] types = (String[]) inv.getArguments()[1];
Object[] args = (Object[]) inv.getArguments()[2];
try {
// Use reflection to get the method calledMethod method = ReflectUtils.findMethodByMethodSignature(invoker.getInterface(), name, types); Class<? >[] params = method.getParameterTypes();if (args == null) {
args = new Object[params.length];
}
// Get the generalization type used by the generalization reference,true or bean or NativeJava
String generic = inv.getAttachment(GENERIC_KEY);
if (StringUtils.isBlank(generic)) {
generic = RpcContext.getContext().getAttachment(GENERIC_KEY);
}
// Deserialize the input parameter with true if generic=true
if (StringUtils.isEmpty(generic)
|| ProtocolUtils.isDefaultGenericSerialization(generic)
|| ProtocolUtils.isGenericReturnRawResult(generic)) {
args = PojoUtils.realize(args, params, method.getGenericParameterTypes());
// If generic= nativeJava, the nativeJava method is used to deserialize the input parameter
} else if (ProtocolUtils.isJavaGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (byte[].class == args[i].getClass()) {
try (UnsafeByteArrayInputStream is = new UnsafeByteArrayInputStream((byte[]) args[i])) {
args[i] = ExtensionLoader.getExtensionLoader(Serialization.class)
.getExtension(GENERIC_SERIALIZATION_NATIVE_JAVA)
.deserialize(null, is).readObject();
} catch (Exception e) {
throw new RpcException("Deserialize argument [" + (i + 1) + "] failed.", e); }}else {
throw new RpcException(...);
}
}
// If generic=bean, the input parameter is deserialized in bean mode
} else if (ProtocolUtils.isBeanGenericSerialization(generic)) {
for (int i = 0; i < args.length; i++) {
if (args[i] instanceof JavaBeanDescriptor) {
args[i] = JavaBeanSerializeUtil.deserialize((JavaBeanDescriptor) args[i]);
} else {
throw newRpcException(...) ; }}}...// Pass the request to the next Filter in the FilterChain and return the result
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args, inv.getAttachments(), inv.getAttributes());
rpcInvocation.setInvoker(inv.getInvoker());
rpcInvocation.setTargetServiceUniqueName(inv.getTargetServiceUniqueName());
return invoker.invoke(rpcInvocation);
} catch (NoSuchMethodException e) {
throw new RpcException(e.getMessage(), e);
} catch (ClassNotFoundException e) {
throw newRpcException(e.getMessage(), e); }}// If it is not a generalization call, the request is passed directly to the next Filter in the FilterChain
returninvoker.invoke(inv); }}Copy the code
This is how the Dubbo service provider intercepts a generalization request and processes it: – Parameter verification, determine whether the request is generalized call – obtain parameter name, parameter type, parameter value – use reflection to obtain the method called, and the use of the generalization method true or NativeJava or bean – according to the generalization method, deserialize the generalization parameter – the request, The called method, parameters, and context information are passed to the next Filter in the FilterChain and the Result Result is returned – deserialized to the service consumer according to the generalization method
conclusion
The above analysis from how to configure the Dubbo plug-in to the whole call process, and then introduce how the service consumer and service provider intercept the generalization call process to serialize the parameters, hoping to help you
reference
My.oschina.net/u/4564034/b…
Qsli. Making. IO / 2018/05/02 /…