Spring Cloud Stream
A message driven overview
1. What is it
Spring Cloud Stream is officially defined as a framework for building message-driven microservices.
Applications interact with binder objects in the Spring Cloud Stream through inputs or outputs. Binding is done by our configuration, and Spring Cloud Stream’s Binder objects are responsible for interacting with the messaging middleware. So, we just need to figure out how to interact with Spring Cloud Stream to facilitate a message-driven approach.
Message event driving is achieved by connecting message broker middleware using Spring Integration. Spring Cloud Stream provides personalized automated configuration implementations for some vendors’ messaging middleware products, referencing the three core concepts of publish – subscribe, consumer group, and partition.
Currently, only RabbitMQ and Kafka are supported.
Summary: Mask the differences of the underlying messaging middleware, reduce switching costs, and unify the messaging programming model.
Liverpoolfc.tv: cloud. Spring. IO/spring – clou…
2. Design idea
(1) standard MQ
The Message content: Message is transmitted between producers and consumers through the Message media
Messages must travel through a specific channel: MessageChannel
How is the message consumed in the MessageChannel, and who is responsible for sending and receiving it: the subinterface of the MessageChannel MessageChannel, SubscribableChannel, is subscribed by the MessageHandler MessageHandler
(2) Why use Cloud Stream
For example, we use RabbitMQ and Kafka. Because of the architectural differences between the two messaging middleware, such as Exchange for RabbitMQ and Topic and Partitions for Kafka, there are differences.
① Why can stream unify the underlying differences?
In the absence of the concept of binders, when our SpringBoot applications interact directly with message-oriented middleware, the implementation details of each of the message-oriented middleware are quite different due to the different purposes for which they were built. By defining the binder as the middle tier, the separation between the application and the details of the messaging middleware is perfectly achieved. By exposing a uniform Channel Channel to the application, it eliminates the need for the application to consider various messaging middleware implementations.
(2) Binder
INPUT corresponds to the consumer and OUTPUT corresponds to the producer
By defining a Binder as an intermediate layer, the application is isolated from the details of the messaging middleware.
(3) Message communication in Stream follows the publish-subscribe pattern
Topics broadcast: Exchange in RabbitMQ and Topic in Kakfa
3. Standard process routine
Binder: A convenient way to connect middleware and mask differences.
Channel: A Channel is an abstraction of Queue. It is the medium for storing and forwarding messages in message communication systems. A Channel is used to configure queues.
Source and Sink: Simply understand that the reference object is Spring Cloud Stream itself, releasing messages from Stream is output, and receiving messages is input.
4. Code APIS and common annotations
Ii. Case Description
1. The RabbitMQ environment is OK
2. Create three submodules in the project
Cloud-stream-rabbitmq-provider8801, which serves as the message sending module for the producer
Cloud-stream-rabbitmq-consumer8802 as the message receiving module
Cloud-stream-rabbitmq-consumer8803 as the message receiving module
Message driven producers
1. Create a Module
cloud-stream-rabbitmq-provider8801
2, POM
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code
3, YML
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: Configure the rabbitMQ service to be bound to;
defaultRabbit: # represents the name of the definition, which is used for binding integrations
type: rabbit # Message component type
environment: Set rabbitMQ environment configuration
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: Integration of services
output: The name # is the name of a channel
destination: studyExchange # indicates the Exchange name definition to use
content-type: application/json Set the message type to JSON and the text to "text/plain".
binder: defaultRabbit Set the specific Settings of the message service to be bound
eureka:
client: # Configure Eureka registration on client
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # Set the heartbeat interval (default is 30 seconds)
lease-expiration-duration-in-seconds: 5 # If the current interval exceeds 5 seconds (default is 90 seconds)
instance-id: send-8801.com # Display host name when listing information
prefer-ip-address: true # Change the access path to an IP address
Copy the code
4, the main startup class StreamMQMain8801
@SpringBootApplication
public class StreamMQMain8801{
public static void main(String[] args){ SpringApplication.run(StreamMQMain8801.class,args); }}Copy the code
5. Business
(1) Interface for sending messages
public interface IMessageProvider{
public String send(a) ;
}
Copy the code
(2) Send message interface implementation class
@EnableBinding(Source.class) // Can be understood as the definition of a message sending pipe
public class MessageProviderImpl implements IMessageProvider{
@Resource
private MessageChannel output; // The channel where the message is sent
@Override
public String send(a){
String serial = UUID.randomUUID().toString();
this.output.send(MessageBuilder.withPayload(serial).build()); // Create and send the message
System.out.println("***serial: "+serial);
returnserial; }}Copy the code
(3)Controller
@RestController
public class SendMessageController{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage(a){
returnmessageProvider.send(); }}Copy the code
6, test,
Start 7001eureka, RabbitMQ, and 8801
Visit: http://localhost:8801/sendMessage
4. Message-driven consumers
1. Create a Module
cloud-stream-rabbitmq-consumer8802
2, POM
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code
3, YML
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: Configure the rabbitMQ service to be bound to;
defaultRabbit: # represents the name of the definition, which is used for binding integrations
type: rabbit # Message component type
environment: Set rabbitMQ environment configuration
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: Integration of services
input: The name # is the name of a channel
destination: studyExchange # indicates the Exchange name definition to use
content-type: application/json Set the type of the message to "object json" or "text/plain".
binder: defaultRabbit Set the specific Settings of the message service to be bound
eureka:
client: # Configure Eureka registration on client
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # Set the heartbeat interval (default is 30 seconds)
lease-expiration-duration-in-seconds: 5 # If the current interval exceeds 5 seconds (default is 90 seconds)
instance-id: receive-8802.com # Display host name when listing information
prefer-ip-address: true # Change the access path to an IP address
Copy the code
4, the main startup class StreamMQMain8802
@SpringBootApplication
public class StreamMQMain8802{
public static void main(String[] args){ SpringApplication.run(StreamMQMain8802.class,args); }}Copy the code
5. Business
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message){
System.out.println("Consumer 1, -------> Received message:" + message.getPayload()+"\t port: "+serverPort); }}Copy the code
6, test 8801 send 8802 receive message
http://localhost:8801/sendMessage
Group consumption and persistence
1, according to 8802, clone out a run 8803
2, start,
RabbitMQ, 7001 service registration, 8801 message production, 8802 message consumption, 8803 message consumption
3. There are two problems after operation
There are duplicate consumption issues, message persistence issues
4, consumption
At present, we have received both 8802/8803 at the same time. There is a problem of repeated consumption
How to solve: Grouping and persistent attribute group
Production case: for example, in the following scenario, the order system in cluster deployment will get the order information from RabbitMQ. If the same order is obtained by two services at the same time, the data error will be caused. We need to avoid this situation. At this point we can use message grouping in Stream to solve this problem.
Note that multiple consumers in the same group in the Stream are competing, ensuring that the message is consumed only once by one of the applications. Different groups can consume all (repeat consumption), and there will be competition within the same group, only one of them can consume.
5, grouping
(1) principle
Placing microservice applications in the same group ensures that messages are consumed only once by one of them.
Different groups can consume, and there will be competition within the same group, only one of them can consume.
(2) operation
8802/8803 implements polling grouping with only one consumer at a time. Messages sent by module 8801 can only be received by either 8802 or 8803, thus avoiding repeated consumption.
8802 and 8803 become the same group, and the two groups are the same: atguiguA
8802, 8803 Modify YML
input: The name # is the name of a channel and will be explained when analyzing the specific source code
destination: studyExchange # indicates the Exchange name definition to use
content-type: application/json Set the type of the message to "object json" or "text/plain".
binder: defaultRabbit Set the specific Settings of the message service to be bound
group: atguiguA
Copy the code
(3) conclusion
For multiple microservice instances of the same group, only one at a time will be received
6. Persistence
Stop 8802/8803 and remove 8802 from group atguiguA. 8801 sends four messages to RabbitMQ
Start 8802 first, no group attribute configuration, no message is typed in the background
Start 8803 again, have group attribute configuration, the background type MQ message (four messages received by 8803)