Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”
This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.
RocketMQ Binder integrates message subscription
AbstractMessageChannelBinder class provides a create a MessageProducer agreement, at the time of initialization Binder createConsumerEndpoint loading method
The RocketMQInboundChannelAdapter RocketMQMessageChannelBinder created and initialized
RocketMQMessageChannelBinder createConsumerEndpoint method:
@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties
consumerProperties)
throws Exception {
if (group == null || "".equals(group)) {
throw new RuntimeException(
"'group must be configured for channel " + destination.getName());
}
RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(
consumerProperties, rocketBinderConfigurationProperties, this);
listenerContainer.setConsumerGroup(group);
listenerContainer.setTopic(destination.getName());
listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
listenerContainer.setSuspendCurrentQueueTimeMillis(
consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());
listenerContainer.setDelayLevelWhenNextConsume(
consumerProperties.getExtension().getDelayLevelWhenNextConsume());
listenerContainer
.setNameServer(rocketBinderConfigurationProperties.getNameServer());
listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));
RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
listenerContainer, consumerProperties, instrumentationManager);
topicInUse.put(destination.getName(), group);
ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
group, consumerProperties);
if (consumerProperties.getMaxAttempts() > 1) {
rocketInboundChannelAdapter
.setRetryTemplate(buildRetryTemplate(consumerProperties));
rocketInboundChannelAdapter
.setRecoveryCallback(errorInfrastructure.getRecoverer());
}
else {
rocketInboundChannelAdapter
.setErrorChannel(errorInfrastructure.getErrorChannel());
}
return rocketInboundChannelAdapter;
}
Copy the code
RocketMQInboundChannelAdapter is the adapter, need adapter Spring Framework retry and callback mechanism, used to subscribe message and message format. RocketMQListenerBindingContainer is RocketMQ client API encapsulation, hold its object adapter.
RocketMQ provides two consumption modes: sequential consumption and concurrent consumption. RocketMQ client API in the default listener is DefaultMessageListenerOrderly order consumption, and consumption is the default listener DefaultMessageListenerConcurrently class, whichever consumption patterns, Every message received by the listener calls back to RocketMQListener
Creation and initialization in RocketMQInboundChannelAdapter RocketMQListener implementation class
RocketMQInboundChannelAdapter
@Override
protected void onInit(a) {
if (consumerProperties == null| |! consumerProperties.getExtension().getEnabled()) {return;
}
super.onInit();
if (this.retryTemplate ! =null) {
Assert.state(getErrorChannel() == null."Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
+ "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
+ "send an error message when retries are exhausted");
}
BindingRocketMQListener listener = new BindingRocketMQListener();
rocketMQListenerContainer.setRocketMQListener(listener);
if(retryTemplate ! =null) {
this.retryTemplate.registerListener(listener);
}
try {
rocketMQListenerContainer.afterPropertiesSet();
}
catch (Exception e) {
log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
throw new IllegalArgumentException(
"rocketMQListenerContainer init error: " + e.getMessage(), e);
}
instrumentationManager.addHealthInstrumentation(
new Instrumentation(rocketMQListenerContainer.getTopic()
+ rocketMQListenerContainer.getConsumerGroup()));
}
protected class BindingRocketMQListener
implements RocketMQListener<Message>, RetryListener {
@Override
public void onMessage(Message message) {
boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate ! =null;
if (enableRetry) {
RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
RocketMQInboundChannelAdapter.this.sendMessage(message);
return null;
}, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback);
}
else {
RocketMQInboundChannelAdapter.this.sendMessage(message); }}@Override
public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback
callback)
,> {
return true;
}
@Override
public <T, E extends Throwable> void close(RetryContext context, RetryCallback
callback, Throwable throwable)
,> {}@Override
public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
callback, Throwable throwable)
,> {}}Copy the code
RocketMQ DefaultMessageListenerOrderly received news, first callback BindingRocketMQListener onMessage method, Call again RocketMQInboundChannelAdapter sendMessage method sends the message to the parent DirectChannel
The receiving and sending message models of Spring Cloud Stream are consistent. Messages received in Binder are sent to MessageChannel first. The subscribed MessageChannel is forwarded by the Dispatcher to the corresponding MessageHandler for processing.
RocketMQInboundChannelAdapter parent MessageProducerSupport getOutputChannel () get MessageChannel is the initialization RocketMQ Binder is the incoming DirectChannel
MessageProducerSupport getOutputChannel method:
@Override
public MessageChannel getOutputChannel(a) {
if (this.outputChannelName ! =null) {
synchronized (this) {
if (this.outputChannelName ! =null) {
this.outputChannel = getChannelResolver().resolveDestination(this.outputChannelName);
this.outputChannelName = null; }}}return this.outputChannel;
}
Copy the code
MessagingTemplate inherits the GenericMessagingTemplate class and actually executes the doSend() method to send the message
An instance of MessageChannel is a DirectChannel object that reuses the previous message sending process and distributes messages to MessageHandler through the message distribution class MessageDispatcher
Processor is StreamListenerMessageHandler DirectChannel corresponds to the news
public class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler {
private final InvocableHandlerMethod invocableHandlerMethod;
private final boolean copyHeaders;
StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod,
boolean copyHeaders, String[] notPropagatedHeaders) {
super(a);this.invocableHandlerMethod = invocableHandlerMethod;
this.copyHeaders = copyHeaders;
this.setNotPropagatedHeaders(notPropagatedHeaders);
}
@Override
protected boolean shouldCopyRequestHeaders(a) {
return this.copyHeaders;
}
public boolean isVoid(a) {
return this.invocableHandlerMethod.isVoid();
}
@Override
protected Object handleRequestMessage(Message
requestMessage) {
try {
return this.invocableHandlerMethod.invoke(requestMessage);
}
catch (Exception e) {
if (e instanceof MessagingException) {
throw (MessagingException) e;
}
else {
throw new MessagingException(requestMessage,
"Exception thrown while invoking "
+ this.invocableHandlerMethod.getShortLogMessage(), e); }}}}Copy the code
Complete callback InvocableHandlerMethod using Java reflection mechanism, StreamListenerMessageHandler with @
StreamListenerAnnotationBeanPostProcessor afterSingletonsInstantiated method:
@Override
public final void afterSingletonsInstantiated(a) {
this.injectAndPostProcessDependencies();
EvaluationContext evaluationContext = IntegrationContextUtils
.getEvaluationContext(this.applicationContext.getBeanFactory());
for (Map.Entry<String, List<StreamListenerHandlerMethodMapping>> mappedBindingEntry : this.mappedListenerMethods
.entrySet()) {
ArrayList<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper> handlers;
handlers = new ArrayList<>();
for (StreamListenerHandlerMethodMapping mapping : mappedBindingEntry
.getValue()) {
final InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory
.createInvocableHandlerMethod(mapping.getTargetBean(),
checkProxy(mapping.getMethod(), mapping.getTargetBean()));
StreamListenerMessageHandler streamListenerMessageHandler = new StreamListenerMessageHandler(
invocableHandlerMethod,
resolveExpressionAsBoolean(mapping.getCopyHeaders(),
"copyHeaders"),
this.springIntegrationProperties
.getMessageHandlerNotPropagatedHeaders());
streamListenerMessageHandler
.setApplicationContext(this.applicationContext);
streamListenerMessageHandler
.setBeanFactory(this.applicationContext.getBeanFactory());
if (StringUtils.hasText(mapping.getDefaultOutputChannel())) {
streamListenerMessageHandler
.setOutputChannelName(mapping.getDefaultOutputChannel());
}
streamListenerMessageHandler.afterPropertiesSet();
if (StringUtils.hasText(mapping.getCondition())) {
String conditionAsString = resolveExpressionAsString(
mapping.getCondition(), "condition");
Expression condition = SPEL_EXPRESSION_PARSER
.parseExpression(conditionAsString);
handlers.add(
new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(
condition, streamListenerMessageHandler));
}
else {
handlers.add(
new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(
null, streamListenerMessageHandler)); }}if (handlers.size() > 1) {
for (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper handler : handlers) {
Assert.isTrue(handler.isVoid(),
StreamListenerErrorMessages.MULTIPLE_VALUE_RETURNING_METHODS);
}
}
AbstractReplyProducingMessageHandler handler;
if (handlers.size() > 1 || handlers.get(0).getCondition() ! =null) {
handler = new DispatchingStreamListenerMessageHandler(handlers,
evaluationContext);
}
else {
handler = handlers.get(0).getStreamListenerMessageHandler();
}
handler.setApplicationContext(this.applicationContext);
handler.setChannelResolver(this.binderAwareChannelResolver);
handler.afterPropertiesSet();
this.applicationContext.getBeanFactory().registerSingleton(
handler.getClass().getSimpleName() + handler.hashCode(), handler);
this.applicationContext
.getBean(mappedBindingEntry.getKey(), SubscribableChannel.class)
.subscribe(handler);
}
this.mappedListenerMethods.clear();
}
Copy the code
In the Spring container management all the singleton object initialization is completed, traverse StreamListenerHandlerMethodMapping, InvocableHandlerMethod and StreamListenerMessageHandler creation and initialization
StreamListenerHandlerMethodMapping saved StreamListener and InvocableHandlerMethod mapping relation, The creation of the mapping relationship is in StreamListenerAnnotationBeanPostProcessor postProcessAfterInitialization () method
@Override
public final Object postProcessAfterInitialization(Object bean, final String beanName)
throws BeansException { Class<? > targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils .getUniqueDeclaredMethods(targetClass);for (Method method : uniqueDeclaredMethods) {
StreamListener streamListener = AnnotatedElementUtils
.findMergedAnnotation(method, StreamListener.class);
if(streamListener ! =null && !method.isBridge()) {
this.streamListenerCallbacks.add(() -> {
Assert.isTrue(method.getAnnotation(Input.class) == null,
StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
this.doPostProcess(streamListener, method, bean); }); }}return bean;
}
private void doPostProcess(StreamListener streamListener, Method method, Object bean) {
streamListener = postProcessAnnotation(streamListener, method);
Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional;
orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream()
.filter(t -> t.supports(method)).findFirst();
Assert.isTrue(orchestratorOptional.isPresent(),
"A matching StreamListenerSetupMethodOrchestrator must be present");
StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional
.get();
streamListenerSetupMethodOrchestrator
.orchestrateStreamListenerSetupMethod(streamListener, method, bean);
}
@Override
public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) {
String methodAnnotatedInboundName = streamListener.value();
String methodAnnotatedOutboundName = StreamListenerMethodUtils
.getOutboundBindingTargetName(method);
int inputAnnotationCount = StreamListenerMethodUtils
.inputAnnotationCount(method);
int outputAnnotationCount = StreamListenerMethodUtils
.outputAnnotationCount(method);
boolean isDeclarative = checkDeclarativeMethod(method,
methodAnnotatedInboundName, methodAnnotatedOutboundName);
StreamListenerMethodUtils.validateStreamListenerMethod(method,
inputAnnotationCount, outputAnnotationCount,
methodAnnotatedInboundName, methodAnnotatedOutboundName,
isDeclarative, streamListener.condition());
if (isDeclarative) {
StreamListenerParameterAdapter[] toSlpaArray;
toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters
.size()];
Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(
method, methodAnnotatedInboundName, this.applicationContext,
this.streamListenerParameterAdapters.toArray(toSlpaArray));
invokeStreamListenerResultAdapter(method, bean,
methodAnnotatedOutboundName, adaptedInboundArguments);
}
else{ registerHandlerMethodOnListenedChannel(method, streamListener, bean); }}private void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) {
Assert.hasText(streamListener.value(), "The binding name cannot be null");
if(! StringUtils.hasText(streamListener.value())) {throw new BeanInitializationException(
"A bound component name must be specified");
}
final String defaultOutputChannel = StreamListenerMethodUtils
.getOutboundBindingTargetName(method);
if (Void.TYPE.equals(method.getReturnType())) {
Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel),
"An output channel cannot be specified for a method that does not return a value");
}
else{ Assert.isTrue(! StringUtils.isEmpty(defaultOutputChannel),"An output channel must be specified for a method that can return a value");
}
StreamListenerMethodUtils.validateStreamListenerMessageHandler(method);
StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add(
streamListener.value(),
new StreamListenerHandlerMethodMapping(bean, method,
streamListener.condition(), defaultOutputChannel,
streamListener.copyHeaders()));
}
Copy the code
StreamListenerAnnotationBeanPostProcessor. This. MappedListenerMethods. Add to create and save StreamListenerHandlerMethodMapping
This is done using The Spring Cloud Stream message model to use RocketMQ, or the RocketMQ component integrated with SpringBoot.