The author source | | los night alibaba cloud native public number

Spring Cloud Stream is used within the Spring Cloud architecture to build highly extensible event-driven microservices designed to simplify the development of messages in Spring Cloud applications.

Spring Cloud Stream has a lot of content, and it has many external dependencies. If you want to become familiar with SCS, Spring Messaging and Spring Integration are two projects that must be understood first. Next, the article will focus on the following three points:

  • What is Spring Messaging
  • What is Spring Integration
  • What is the SCS system and its principles

This article is accompanied by interactive tutorials. You have logged into ali Cloud Zhixing Hands-on laboratory, and logged into start.aliyun.com_ _ on PC to experience it immediately in a browser.

Spring Messaging

Spring Messaging is a module in the Spring Framework, whose function is to unify the message programming model.

  • For example, the model corresponding to Messaging includes a message body Payload and a message Header:

package org.springframework.messaging;
public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}
Copy the code
  • MessageChannel is used to receive messages. The send method can be called to send messages to this MessageChannel:

@FunctionalInterface public interface MessageChannel { long INDEFINITE_TIMEOUT = -1; default boolean send(Message<? > message) { return send(message, INDEFINITE_TIMEOUT); } boolean send(Message<? > message, long timeout); }Copy the code

How are messages consumed in a message channel?

  • Implemented by SubscribableChannel, a message channel subscribed to by the message channel’s subinterface, and subscribed by the MessageHandler MessageHandler:
public interface SubscribableChannel extends MessageChannel {
    boolean subscribe(MessageHandler handler);
    boolean unsubscribe(MessageHandler handler);
}
Copy the code
  • MessageHandler actually consumes/processes the message:
@FunctionalInterface public interface MessageHandler { void handleMessage(Message<? > message) throws MessagingException; }Copy the code

Some other functions are derived from the message model within Spring Messaging, such as:

  • Messages received parameters and return values: receive parameters processor HandlerMethodArgumentResolver cooperate @ Header, @ annotations use such as content; Message after receiving the return value of the processor HandlerMethodReturnValueHandler used with @ SendTo annotations;
  • Message body content converter MessageConverter;
  • Unified the abstract message template AbstractMessageSendingTemplate;
  • ChannelInterceptor;

Spring Integration

Spring Integration provides extensions to the Spring programming model to support Enterprise Integration Patterns and is an extension to Spring Messaging.

It puts forward many new concepts, including message routing MessageRoute, message distribution MessageDispatcher, message filtering Filter, message transformation Transformer, message aggregation Aggregator, message splitting Splitter and so on. MessageChannel and MessageHandler are also provided. It includes DirectChannel, ExecutorChannel, publishing subscribechannel and MessageFilter, ServiceActivatingHandler, and MethodInvokingSplitter And so on.

Here are a few ways to handle messages:

  • Message segmentation:

  • Aggregation of messages:

  • Message filtering:

  • Message distribution:

Next, let’s try Spring Integration with a simple example.

This code is interpreted as:

SubscribableChannel messageChannel =new DirectChannel(); // 1 messageChannel.subscribe(msg-> { // 2 System.out.println("receive: " +msg.getPayload()); }); messageChannel.send(MessageBuilder.withPayload("msgfrom alibaba").build()); / / 3Copy the code
  • Construct a subscribeable messageChannel messageChannel.
  • Use MessageHandler to consume messages in the message channel.
  • A message is sent to the message channel, which is ultimately consumed by MessageHandler in the message channel.
  • Finally, the console printed :receive: MSG from Alibaba.

DirectChannel has a UnicastingDispatcher message dispatcher that is sent to the corresponding MessageChannel MessageChannel. As the name indicates, UnicastingDispatcher is a unicast dispatcher. Only one message channel can be selected. So how to choose? The LoadBalancingStrategy load balancing policy is provided internally. By default, only polling is implemented and can be extended.

We modify the previous code a bit to use multiple MessageHandlers to handle messages:

SubscribableChannel messageChannel = new DirectChannel();

messageChannel.subscribe(msg -> {
     System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
     System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
Copy the code

Since the message dispatcher inside DirectChannel is unicast and adopts the load balancing strategy of polling, the two consumption here correspond to the two MessageHandlers respectively. The console prints:

receive1: msg from alibaba
receive2: msg from alibaba
Copy the code

Since there is a unicast message dispatcher, there must also be a broadcast message dispatcher, which is used by the PublishSubscribeChannel message channel. The broadcast message distributor distributes messages to all MessageHandlers:

SubscribableChannel messageChannel = new PublishSubscribeChannel();

messageChannel.subscribe(msg -> {
     System.out.println("receive1: " + msg.getPayload());
});

messageChannel.subscribe(msg -> {
     System.out.println("receive2: " + msg.getPayload());
});

messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
messageChannel.send(MessageBuilder.withPayload("msg from alibaba").build());
Copy the code

Spring Cloud Stream

The relationship between SCS and each module is as follows:

  • SCS encapsulates on the basis of Spring Integration, and puts forward concepts such as Binder, Binding, @enableBinding, and @StreamListener.
  • SCS is integrated with Spring Boot Actuator, providing/Bindings, /channelsendpoint.
  • SCS is integrated with Spring Boot Externalized Configuration, providing BindingProperties, BinderProperties and other Externalized Configuration classes.
  • SCS enhances processing logic in case of message sending failure and consumption failure.
  • SCS is the enhancement of Spring Integration and Integration with Spring Boot system, and is also the basis of Spring Cloud Bus. It shields the implementation details of the underlying message-oriented middleware and hopes to use a unified SET of APIS to send/consume messages. The implementation details of the underlying message-oriented middleware are completed by the binders of each message-oriented middleware.

Binders are components that provide integration with external message-oriented middleware and provide two methods for constructing bindings, bindConsumer and bindProducer, which are used to construct producers and consumers, respectively. Official implementations include Rabbit Binder and Kafka Binder, and RocketMQ Binder has been implemented internally by Spring Cloud Alibaba.

As you can see from the figure, Binding is a bridge between the application and the messaging middleware for the consumption and production of messages. Let’s look at a simple example using the RocketMQ Binder and examine its underlying processing principles:

  • Start sending classes and messages:
@SpringBootApplication @EnableBinding({ Source.class, Sink.class }) // 1 public class SendAndReceiveApplication { public static void main(String[] args) { SpringApplication.run(SendAndReceiveApplication.class, args); } @Bean // 2 public CustomRunner customRunner() { return new CustomRunner(); } public static class CustomRunner implements CommandLineRunner { @Autowired private Source source; @Override public void run(String... args) throws Exception { int count = 5; for (int index = 1; index <= count; index++) { source.output().send(MessageBuilder.withPayload("msg-" + index).build()); // 3}}}}Copy the code
  • Message reception:
@Service public class StreamListenerReceiveService { @StreamListener(Sink.INPUT) // 4 public void receiveByStreamListener1(String receiveMsg) { System.out.println("receiveByStreamListener: " + receiveMsg); }}Copy the code

This code is simple and does not involve rocketMQ-related code. Messages are sent and received based on the SCS system. If you want to switch to RabbitMQ or Kafka, you only need to change the configuration file, not the code.

Let’s examine how this code works:

The interface attributes Source and Sink corresponding to 1.@EnableBinding are provided by SCS. SCS internally constructs a BindableProxyFactory based on Source and Sink, and the MessageChannel returned by the corresponding Output and input methods is a DirectChannel. The value of the annotations modified by the output and input methods is the binding name in the configuration file.

public interface Source {
    String OUTPUT = "output";
    @Output(Source.OUTPUT)
    MessageChannel output();
}
public interface Sink {
    String INPUT = "input";
    @Input(Sink.INPUT)
    SubscribableChannel input();
}
Copy the code

The bindings name in the configuration file is output and input, corresponding to the value in the Source and Sink interface method:

spring.cloud.stream.bindings.output.destination=test-topic
spring.cloud.stream.bindings.output.content-type=text/plain
spring.cloud.stream.rocketmq.bindings.output.producer.group=demo-group

spring.cloud.stream.bindings.input.destination=test-topic
spring.cloud.stream.bindings.input.content-type=text/plain
spring.cloud.stream.bindings.input.group=test-group1
Copy the code
  1. Build CommandLineRunner, which executes the CustomRunner’s Run method when the program starts.

  2. Call the Output method in the Source interface to get the DirectChannel and send a message to the message channel. This is consistent with the previous code in the Spring Integration chapter.

  • The Source of the output after sending a message to the DirectChannel news channel will be AbstractMessageChannelBinder# SendingHandler the MessageHandler processing, And then it will be entrusted to AbstractMessageChannelBinder# createProducerMessageHandler create MessageHandler processing (the method) by a different message middleware implementation.
  • Different Message middleware corresponding AbstractMessageChannelBinder# createProducerMessageHandler method returns the MessageHandler inside the Spring Message is transformed into corresponding middleware The Message model is sent to the broker of the corresponding middleware.
  1. Subscribe to messages using @StreamListener. Please note that the value of sink. input in the annotation is “input”, which will be configured according to the value of binding for input in the configuration file:
  • Different message middleware corresponding AbstractMessageChannelBinder# createConsumerEndpoint method will use Consumer subscribe message, After subscribing to the Message, the Message model corresponding to the middleware is internally transformed into the Spring Message.
  • After the Message transformation, Spring Message is sent to the Message channel whose name is INPUT.
  • @ StreamListener corresponding StreamListenerMessageHandler subscribed to name for the message of the input channel, the news consumption.

This process is a bit verbose in text, but can be summarized in a diagram (the yellow sections relate to Binder implementations of various messaging middleware and MQ’s basic subscription and publishing capabilities) :

At the end of the SCS section, let’s take a look at the SCS code about how messages are handled:

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='1'")
public void receiveByHeader(Message msg) {
     System.out.println("receive by headers['index']=='1': " + msg);
}

@StreamListener(value = Sink.INPUT, condition = "headers['index']=='9999'")
public void receivePerson(@Payload Person person) {
     System.out.println("receive Person: " + person);
}

@StreamListener(value = Sink.INPUT)
public void receiveAllMsg(String msg) {
     System.out.println("receive allMsg by StreamListener. content: " + msg);
}

@StreamListener(value = Sink.INPUT)
public void receiveHeaderAndMsg(@Header("index") String index, Message msg) {
     System.out.println("receive by HeaderAndMsg by StreamListener. content: " + msg);
}
Copy the code

Notice how similar this code is to the code that receives requests in the Spring MVC Controller? In fact, their architecture is similar, Spring MVC for Controller Are the types of processing parameters and return values in org. Springframework. Web. Method. Support. HandlerMethodArgumentResolver, org. Springframework. Web. Method. The suppor Todd Harper andlerMethodReturnValueHandler.

The class for handling parameters and return values in Spring Messaging was mentioned earlier, Respectively is org. Springframework. Messaging. Handler. Invocation. HandlerMethodArgumentResolver, org. Springframework. Messaging. The handler. Invocation. HandlerMethodReturnValueHandler.

Their class names are identical, even the internal method names.

conclusion

The RocketMQ Binder Demos provide more examples of SCS and RocketMQ Binder, including message aggregation, partitioning, and filtering. Message exception handling; Message label, SQL filtering; Synchronous, asynchronous consumption and so on.