Small knowledge, big challenge! This article is participating in the creation activity of “Essential Tips for Programmers”

This article has participated in the “Digitalstar Project” and won a creative gift package to challenge the creative incentive money.

RocketMQ Binder integrates message subscription

AbstractMessageChannelBinder class provides a create a MessageProducer agreement, at the time of initialization Binder createConsumerEndpoint loading method

The RocketMQInboundChannelAdapter RocketMQMessageChannelBinder created and initialized

RocketMQMessageChannelBinder createConsumerEndpoint method:

@Override
protected MessageProducer createConsumerEndpoint(ConsumerDestination destination, String group, ExtendedConsumerProperties
       
         consumerProperties)
       
      throws Exception {
   if (group == null || "".equals(group)) {
      throw new RuntimeException(
            "'group must be configured for channel " + destination.getName());
   }

   RocketMQListenerBindingContainer listenerContainer = new RocketMQListenerBindingContainer(
         consumerProperties, rocketBinderConfigurationProperties, this);
   listenerContainer.setConsumerGroup(group);
   listenerContainer.setTopic(destination.getName());
   listenerContainer.setConsumeThreadMax(consumerProperties.getConcurrency());
   listenerContainer.setSuspendCurrentQueueTimeMillis(
         consumerProperties.getExtension().getSuspendCurrentQueueTimeMillis());
   listenerContainer.setDelayLevelWhenNextConsume(
         consumerProperties.getExtension().getDelayLevelWhenNextConsume());
   listenerContainer
         .setNameServer(rocketBinderConfigurationProperties.getNameServer());
   listenerContainer.setHeaderMapper(createHeaderMapper(consumerProperties));

   RocketMQInboundChannelAdapter rocketInboundChannelAdapter = new RocketMQInboundChannelAdapter(
         listenerContainer, consumerProperties, instrumentationManager);

   topicInUse.put(destination.getName(), group);

   ErrorInfrastructure errorInfrastructure = registerErrorInfrastructure(destination,
         group, consumerProperties);
   if (consumerProperties.getMaxAttempts() > 1) {
      rocketInboundChannelAdapter
            .setRetryTemplate(buildRetryTemplate(consumerProperties));
      rocketInboundChannelAdapter
            .setRecoveryCallback(errorInfrastructure.getRecoverer());
   }
   else {
      rocketInboundChannelAdapter
            .setErrorChannel(errorInfrastructure.getErrorChannel());
   }

   return rocketInboundChannelAdapter;
}
Copy the code

RocketMQInboundChannelAdapter is the adapter, need adapter Spring Framework retry and callback mechanism, used to subscribe message and message format. RocketMQListenerBindingContainer is RocketMQ client API encapsulation, hold its object adapter.

RocketMQ provides two consumption modes: sequential consumption and concurrent consumption. RocketMQ client API in the default listener is DefaultMessageListenerOrderly order consumption, and consumption is the default listener DefaultMessageListenerConcurrently class, whichever consumption patterns, Every message received by the listener calls back to RocketMQListener

Creation and initialization in RocketMQInboundChannelAdapter RocketMQListener implementation class

RocketMQInboundChannelAdapter

@Override
protected void onInit(a) {
   if (consumerProperties == null| |! consumerProperties.getExtension().getEnabled()) {return;
   }
   super.onInit();
   if (this.retryTemplate ! =null) {
      Assert.state(getErrorChannel() == null."Cannot have an 'errorChannel' property when a 'RetryTemplate' is "
                  + "provided; use an 'ErrorMessageSendingRecoverer' in the 'recoveryCallback' property to "
                  + "send an error message when retries are exhausted");
   }

   BindingRocketMQListener listener = new BindingRocketMQListener();
   rocketMQListenerContainer.setRocketMQListener(listener);

   if(retryTemplate ! =null) {
      this.retryTemplate.registerListener(listener);
   }

   try {
      rocketMQListenerContainer.afterPropertiesSet();

   }
   catch (Exception e) {
      log.error("rocketMQListenerContainer init error: " + e.getMessage(), e);
      throw new IllegalArgumentException(
            "rocketMQListenerContainer init error: " + e.getMessage(), e);
   }

   instrumentationManager.addHealthInstrumentation(
         new Instrumentation(rocketMQListenerContainer.getTopic()
               + rocketMQListenerContainer.getConsumerGroup()));
}

protected class BindingRocketMQListener
			implements RocketMQListener<Message>, RetryListener {

		@Override
		public void onMessage(Message message) {
			boolean enableRetry = RocketMQInboundChannelAdapter.this.retryTemplate ! =null;
			if (enableRetry) {
				RocketMQInboundChannelAdapter.this.retryTemplate.execute(context -> {
					RocketMQInboundChannelAdapter.this.sendMessage(message);
					return null;
				}, (RecoveryCallback<Object>) RocketMQInboundChannelAdapter.this.recoveryCallback);
			}
			else {
				RocketMQInboundChannelAdapter.this.sendMessage(message); }}@Override
		public <T, E extends Throwable> boolean open(RetryContext context, RetryCallback
       
         callback)
       ,> {
			return true;
		}

		@Override
		public <T, E extends Throwable> void close(RetryContext context, RetryCallback
       
         callback, Throwable throwable)
       ,> {}@Override
		public <T, E extends Throwable> void onError(RetryContext context, RetryCallback
       
         callback, Throwable throwable)
       ,> {}}Copy the code

RocketMQ DefaultMessageListenerOrderly received news, first callback BindingRocketMQListener onMessage method, Call again RocketMQInboundChannelAdapter sendMessage method sends the message to the parent DirectChannel

The receiving and sending message models of Spring Cloud Stream are consistent. Messages received in Binder are sent to MessageChannel first. The subscribed MessageChannel is forwarded by the Dispatcher to the corresponding MessageHandler for processing.

RocketMQInboundChannelAdapter parent MessageProducerSupport getOutputChannel () get MessageChannel is the initialization RocketMQ Binder is the incoming DirectChannel

MessageProducerSupport getOutputChannel method:

@Override
public MessageChannel getOutputChannel(a) {
   if (this.outputChannelName ! =null) {
      synchronized (this) {
         if (this.outputChannelName ! =null) {
            this.outputChannel = getChannelResolver().resolveDestination(this.outputChannelName);
            this.outputChannelName = null; }}}return this.outputChannel;
}
Copy the code

MessagingTemplate inherits the GenericMessagingTemplate class and actually executes the doSend() method to send the message

An instance of MessageChannel is a DirectChannel object that reuses the previous message sending process and distributes messages to MessageHandler through the message distribution class MessageDispatcher

Processor is StreamListenerMessageHandler DirectChannel corresponds to the news

public class StreamListenerMessageHandler extends AbstractReplyProducingMessageHandler {

   private final InvocableHandlerMethod invocableHandlerMethod;

   private final boolean copyHeaders;

   StreamListenerMessageHandler(InvocableHandlerMethod invocableHandlerMethod,
         boolean copyHeaders, String[] notPropagatedHeaders) {
      super(a);this.invocableHandlerMethod = invocableHandlerMethod;
      this.copyHeaders = copyHeaders;
      this.setNotPropagatedHeaders(notPropagatedHeaders);
   }

   @Override
   protected boolean shouldCopyRequestHeaders(a) {
      return this.copyHeaders;
   }

   public boolean isVoid(a) {
      return this.invocableHandlerMethod.isVoid();
   }

   @Override
   protected Object handleRequestMessage(Message
        requestMessage) {
      try {
         return this.invocableHandlerMethod.invoke(requestMessage);
      }
      catch (Exception e) {
         if (e instanceof MessagingException) {
            throw (MessagingException) e;
         }
         else {
            throw new MessagingException(requestMessage,
                  "Exception thrown while invoking "
                        + this.invocableHandlerMethod.getShortLogMessage(), e); }}}}Copy the code

Complete callback InvocableHandlerMethod using Java reflection mechanism, StreamListenerMessageHandler with @

StreamListenerAnnotationBeanPostProcessor afterSingletonsInstantiated method:

@Override
public final void afterSingletonsInstantiated(a) {
   this.injectAndPostProcessDependencies();
   EvaluationContext evaluationContext = IntegrationContextUtils
         .getEvaluationContext(this.applicationContext.getBeanFactory());
   for (Map.Entry<String, List<StreamListenerHandlerMethodMapping>> mappedBindingEntry : this.mappedListenerMethods
         .entrySet()) {
      ArrayList<DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper> handlers;
      handlers = new ArrayList<>();
      for (StreamListenerHandlerMethodMapping mapping : mappedBindingEntry
            .getValue()) {
         final InvocableHandlerMethod invocableHandlerMethod = this.messageHandlerMethodFactory
               .createInvocableHandlerMethod(mapping.getTargetBean(),
                     checkProxy(mapping.getMethod(), mapping.getTargetBean()));
         StreamListenerMessageHandler streamListenerMessageHandler = new StreamListenerMessageHandler(
               invocableHandlerMethod,
               resolveExpressionAsBoolean(mapping.getCopyHeaders(),
                     "copyHeaders"),
               this.springIntegrationProperties
                     .getMessageHandlerNotPropagatedHeaders());
         streamListenerMessageHandler
               .setApplicationContext(this.applicationContext);
         streamListenerMessageHandler
               .setBeanFactory(this.applicationContext.getBeanFactory());
         if (StringUtils.hasText(mapping.getDefaultOutputChannel())) {
            streamListenerMessageHandler
                  .setOutputChannelName(mapping.getDefaultOutputChannel());
         }
         streamListenerMessageHandler.afterPropertiesSet();
         if (StringUtils.hasText(mapping.getCondition())) {
            String conditionAsString = resolveExpressionAsString(
                  mapping.getCondition(), "condition");
            Expression condition = SPEL_EXPRESSION_PARSER
                  .parseExpression(conditionAsString);
            handlers.add(
                  new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(
                        condition, streamListenerMessageHandler));
         }
         else {
            handlers.add(
                  new DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper(
                        null, streamListenerMessageHandler)); }}if (handlers.size() > 1) {
         for (DispatchingStreamListenerMessageHandler.ConditionalStreamListenerMessageHandlerWrapper handler : handlers) {
            Assert.isTrue(handler.isVoid(),
                  StreamListenerErrorMessages.MULTIPLE_VALUE_RETURNING_METHODS);
         }
      }
      AbstractReplyProducingMessageHandler handler;

      if (handlers.size() > 1 || handlers.get(0).getCondition() ! =null) {
         handler = new DispatchingStreamListenerMessageHandler(handlers,
               evaluationContext);
      }
      else {
         handler = handlers.get(0).getStreamListenerMessageHandler();
      }
      handler.setApplicationContext(this.applicationContext);
      handler.setChannelResolver(this.binderAwareChannelResolver);
      handler.afterPropertiesSet();
      this.applicationContext.getBeanFactory().registerSingleton(
            handler.getClass().getSimpleName() + handler.hashCode(), handler);
      this.applicationContext
            .getBean(mappedBindingEntry.getKey(), SubscribableChannel.class)
            .subscribe(handler);
   }
   this.mappedListenerMethods.clear();
}
Copy the code

In the Spring container management all the singleton object initialization is completed, traverse StreamListenerHandlerMethodMapping, InvocableHandlerMethod and StreamListenerMessageHandler creation and initialization

StreamListenerHandlerMethodMapping saved StreamListener and InvocableHandlerMethod mapping relation, The creation of the mapping relationship is in StreamListenerAnnotationBeanPostProcessor postProcessAfterInitialization () method

@Override
public final Object postProcessAfterInitialization(Object bean, final String beanName)
      throws BeansException { Class<? > targetClass = AopUtils.isAopProxy(bean) ? AopUtils.getTargetClass(bean) : bean.getClass(); Method[] uniqueDeclaredMethods = ReflectionUtils .getUniqueDeclaredMethods(targetClass);for (Method method : uniqueDeclaredMethods) {
      StreamListener streamListener = AnnotatedElementUtils
            .findMergedAnnotation(method, StreamListener.class);
      if(streamListener ! =null && !method.isBridge()) {
         this.streamListenerCallbacks.add(() -> {
            Assert.isTrue(method.getAnnotation(Input.class) == null,
                  StreamListenerErrorMessages.INPUT_AT_STREAM_LISTENER);
            this.doPostProcess(streamListener, method, bean); }); }}return bean;
}
private void doPostProcess(StreamListener streamListener, Method method, Object bean) {
		streamListener = postProcessAnnotation(streamListener, method);
		Optional<StreamListenerSetupMethodOrchestrator> orchestratorOptional;
		orchestratorOptional = this.streamListenerSetupMethodOrchestrators.stream()
				.filter(t -> t.supports(method)).findFirst();
		Assert.isTrue(orchestratorOptional.isPresent(),
				"A matching StreamListenerSetupMethodOrchestrator must be present");
		StreamListenerSetupMethodOrchestrator streamListenerSetupMethodOrchestrator = orchestratorOptional
				.get();
		streamListenerSetupMethodOrchestrator
				.orchestrateStreamListenerSetupMethod(streamListener, method, bean);
	}

@Override
		public void orchestrateStreamListenerSetupMethod(StreamListener streamListener, Method method, Object bean) {
			String methodAnnotatedInboundName = streamListener.value();

			String methodAnnotatedOutboundName = StreamListenerMethodUtils
					.getOutboundBindingTargetName(method);
			int inputAnnotationCount = StreamListenerMethodUtils
					.inputAnnotationCount(method);
			int outputAnnotationCount = StreamListenerMethodUtils
					.outputAnnotationCount(method);
			boolean isDeclarative = checkDeclarativeMethod(method,
					methodAnnotatedInboundName, methodAnnotatedOutboundName);
			StreamListenerMethodUtils.validateStreamListenerMethod(method,
					inputAnnotationCount, outputAnnotationCount,
					methodAnnotatedInboundName, methodAnnotatedOutboundName,
					isDeclarative, streamListener.condition());
			if (isDeclarative) {
				StreamListenerParameterAdapter[] toSlpaArray;
				toSlpaArray = new StreamListenerParameterAdapter[this.streamListenerParameterAdapters
						.size()];
				Object[] adaptedInboundArguments = adaptAndRetrieveInboundArguments(
						method, methodAnnotatedInboundName, this.applicationContext,
						this.streamListenerParameterAdapters.toArray(toSlpaArray));
				invokeStreamListenerResultAdapter(method, bean,
						methodAnnotatedOutboundName, adaptedInboundArguments);
			}
			else{ registerHandlerMethodOnListenedChannel(method, streamListener, bean); }}private void registerHandlerMethodOnListenedChannel(Method method, StreamListener streamListener, Object bean) {
			Assert.hasText(streamListener.value(), "The binding name cannot be null");
			if(! StringUtils.hasText(streamListener.value())) {throw new BeanInitializationException(
						"A bound component name must be specified");
			}
			final String defaultOutputChannel = StreamListenerMethodUtils
					.getOutboundBindingTargetName(method);
			if (Void.TYPE.equals(method.getReturnType())) {
				Assert.isTrue(StringUtils.isEmpty(defaultOutputChannel),
						"An output channel cannot be specified for a method that does not return a value");
			}
			else{ Assert.isTrue(! StringUtils.isEmpty(defaultOutputChannel),"An output channel must be specified for a method that can return a value");
			}
			StreamListenerMethodUtils.validateStreamListenerMessageHandler(method);
			StreamListenerAnnotationBeanPostProcessor.this.mappedListenerMethods.add(
					streamListener.value(),
					new StreamListenerHandlerMethodMapping(bean, method,
							streamListener.condition(), defaultOutputChannel,
							streamListener.copyHeaders()));
		}
Copy the code

StreamListenerAnnotationBeanPostProcessor. This. MappedListenerMethods. Add to create and save StreamListenerHandlerMethodMapping

This is done using The Spring Cloud Stream message model to use RocketMQ, or the RocketMQ component integrated with SpringBoot.