Introduction to the
SpringCloud Stream is a highly extensible event-driven microservice component for building connections to shared messaging systems. It provides a flexible programming model for setting up stand-alone production-level Spring applications based on Spring Boot, and using Spring Integration to provide connectivity to message brokers allows us to use it with little concern for the specific message queue implementation. It shields the differences of underlying messaging middleware, reduces switching costs, unifies messaging programming model, and enables developers to pay more attention to their own business.
Architectural model
Maybe we could look at a simpler picture
As you can see, each system only relies on its own Binder to interact with message-oriented middleware or other systems. Stream hides all the details of sending messages and only cares about three core modules
- Destination BindersThe target binder that tells the Stream which message queue service you want to bind to
Binder
Implement. For example,RabbitMQ
orKafka
的Binder
? This is its core building block, responsible for supporting and providing integration with external systems or external messaging systems that we own - Destination BindingsDestination binding, which provides a bridge between message producers and consumers to the Stream. For example,
RabbitMQ
You need to tell the Stream what the current system is using to send messageschannel -> exchange -> routingKey -> queue
What are they (of course this is all done in the configuration file) - Message: Is the Message we need to send
For any message, just provide the three core modules above and we don’t have to worry about the details of sending.
Until The 3.2.1 release of SpringCloud Stream, it supported almost all of the popular message queuing products on the market. RabbitMQ, Kafka, RocketMQ, AWS SNS/SQS, etc., mainly due to this trend of centralization, different messaging middleware vendors have developed their own binders to provide SpringCloud Stream.
Early experience
Take RabbitMQ as an example to experience Stream message driver development. First we need to introduce dependencies
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code
Make sure you have a basic understanding of the components in RabbitMQ before specifying them in the configuration file. Otherwise read the RabbitMQ Basics
producers
Configuration file:
Rabbitmq: host: 129.204.178.49 # Your rabbitMQ service port: 5672 username: guest password: guest cloud: Bindings: output channel-demo: # channel Binders: binders: binder type: Rabbit # rabbitMQ Rabbit: Bindings: output-channel Producer: routing-key-expression: ''demoRoutingKey''Copy the code
Declare output channel
Public interface MessageSource {@output ("output-channel-demo") MessageChannel Output(); }Copy the code
Define a channel binding class
/** * This annotation is used to specify one or more interfaces that define @input or @output annotations. * */ @enableBinding (MessageSource. Class) public class MessageSourceHandler {}Copy the code
Next we write an integration test and send the message
@Autowired MessageSource messageSource; / messaging Test * * * / @ Test public void Test () {messageSource. The output (). The send (MessageBuilder. WithPayload (" Test "). The build ()); }Copy the code
Now that the message has been successfully sent, let’s write the consumer
consumers
The configuration file
Binders: Cloud: stream: binders: # Binders: Type: rabbit # rabbitMQ Rabbit: Bindings: input-channel-demo: Bindings: input-channel-demo: consumer: binding-routing-key: 'demoRoutingKey' Bindings: input-channel-demo: SomeGroup # Prevents multiple consumer instances from receiving messages repeatedly, such that a message is sent to only one instance of the same group Host: 129.204.178.49 Port: 5672 username: guest password: guestCopy the code
Declare input channel
Public interface MessageSink {@input ("input-channel-demo") SubscribableChannel Input(); }Copy the code
Declaring a bound class
@enableBinding (messagesink.class) public class MessageSinkHandler {/** * listen for input-channel-demo messages, The @StreamListener annotation supports SPEL expressions, * */ @streamListener ("input-channel-demo") public void consume(String message){ System.out.println(" received message: "+message); }}Copy the code
A complete SpringCloud Stream microservice message-driven demo is completed, the application is launched, and consumers can successfully receive test messages sent by producers. To use SpringCloud Stream well you must understand the content of the configuration file!
GitHub source address SpringCloud-stream introductory case
Sending delayed messages
Sending deferred messages in SpringCloud Stream is very simple. First we need to specify the type of switch to be a deferred switch in the producer/consumer profile
Bindings: input-channel-demo: # Message input channel Consumer: delayed-exchange: true binding-routing-key: 'demoRoutingKey'Copy the code
Same as the producer, omitted here. Then you just need to add a header to the code you send above
/ / set the message for 30 seconds after a messageSource to consumers. The output (). The send (MessageBuilder. WithPayload (" test "). SetHeader (" x - delay ", 30 * 1000).build());Copy the code
If you send delayed messages and raise unknown Exchange type ‘X-delayed -message’, it is because your RabbitMQ service does not have a delay queue plug-in installed. Go to the official website to install it
The business of deferring messages is now in place, and as you can see here it is very easy to integrate messages using SpringCloud Stream, for example Almost all configurations are in RabbitConsumerProperties, RabbitProductProperties, and the common producer and consumer properties are in their parent RabbitCommonProperties. Almost all of RabbitMQ’s features and functions can be done directly in the configuration file. RabbitMQ Consumer Properties has limited capabilities. For details on other advanced features, see RabbitMQ Consumer Properties
But if you think that’s true, you’re wrong, as SpringBoot, which is easy to use and takes 20% of your energy, can take 200% of your energy to play well. SpringCloud Stream actually contains a series of complex technical systems, such as Spring Intergration, Spring Message, Spring AMQP and so on. Its internal principle implementation and component integration are very complicated.
I think one of the reasons SpringCloud Stream hasn’t caught on this long is that there are so many things involved in this technical system, and if something goes wrong in a production environment and you have to read the source code, it’s a lot more technical work than expected.
Spring Message
Spring Message is a sub-module of the Spring Framework that defines a unified programming model for messages, and in fact SpringCloud Stream is based on its implementation.
Spring Message defines the Message programming model in the figure above, proposing the abstraction of Channel Channel and Message Message. All messages are sent by the producer to the Message middleware in Output Channel, and then all consumers get messages from the Input Channel Input. The Message itself consists of two parts, the header and the payload.
The core annotations we covered in our initial experience above are the embodiment of this model
- @output: represents the Output channel from which the producer sends a message
- @input: represents the Input channel from which the consumer reads the message
- @enableBinding: Binds the interface that defines the channel to a
Bean
So that we can pass theBean
The operation channel sends and receives messages. - StreamListener: Subscribes to messages in the input channel
SpringCloud Function functional programming
After Release 3.1 of SpringCloud Stream, you will find that several core annotations such as @enableBinding have been deprecated by the official annotation. This is due to the official release of the updated functional programming model SpringCloud Function. Try to push programming to a higher level with this component. This article does not cover this component in detail, but shows you how to combine SpringCloud Function for message sending and consumption in SpringCloud Stream.
The channel naming of messages in conjunction with the SpringCloud Function follows the following convention
- Input:
<functionName> + -in- + <index>
- Output:
<functionName> + -out- + <index>
Index represents the input or output bound index, so for now we just write 0.
Task-based message
Referring to the official document Suppliers (Sources), we start by writing a producer’s method for sending messages.
@bean public Supplier<String> source1() {return () -> "test timing message "; }Copy the code
Then according to the rules of the channel in the application. The configuration in the yml channel called source1 – out – 0, to configure spring. Cloud. The function. The destination = source1, specify the functions of the function name.
Next we write about consumers, and again we need a way to consume.
@bean public Consumer<String> sink1() {return message -> system.out.println (" Received message :" + message); }Copy the code
Then change the channel name in the configuration file to sink1-in-0 according to the channel rules. A simple timed message is sent and received, and the producer sends a message every second to the consumer. It has to be said that the integration of SpringCloud Stream and SpringCloud Function is really…… It’s amazing.
Business-triggered messages
But we are more likely to use business-triggered sending of messages, so SpringCloud Stream provides us with a StreamBridge component. You can use it to send messages by specifying the channel name
@test public void Test () {streambridge. send(" source1-out0 "," Test message "); }Copy the code
Now that we have finished sending the message, the consumer can just use the above consumption function.
conclusion
I have to say that with the integration of SpringCloud Function, the sending and receiving of messages has moved to a new stage, but the configuration specification
+ -in- +
makes me feel a little uncomfortable…… Even now I think the annotations that were deprecated before 3.1 May be more suitable for our development.
conclusion
SpringCloud Stream only supported RabbitMQ and Kafka when it was used in production projects last year, but now almost all popular messaging-oriented middleware has developed binders to accommodate it, demonstrating its dominance.
Although I always advocate the updating and iteration of technology, I would like to sincerely remind you that we can try to introduce and use it in new projects, and we should be cautious in updating technical components in old projects. After all, SpringCloud Stream involves too many and complex technical systems. This article is just the tip of the iceberg for SpringCloud Stream. We don’t have a good handle on it right now, but I still believe it will become the mainstream of messaging middleware docking!
We can see SpringCloud Stream as a set of technologies that try to push message-driven to the next level, but I think that goal is still a bit distant in terms of actual usage at this point……