Spring Cloud Stream

A message driven overview

1. What is it

Spring Cloud Stream is officially defined as a framework for building message-driven microservices.

Applications interact with binder objects in the Spring Cloud Stream through inputs or outputs. Binding is done by our configuration, and Spring Cloud Stream’s Binder objects are responsible for interacting with the messaging middleware. So, we just need to figure out how to interact with Spring Cloud Stream to facilitate a message-driven approach.

Message event driving is achieved by connecting message broker middleware using Spring Integration. Spring Cloud Stream provides personalized automated configuration implementations for some vendors’ messaging middleware products, referencing the three core concepts of publish – subscribe, consumer group, and partition.

Currently, only RabbitMQ and Kafka are supported.

Summary: Mask the differences of the underlying messaging middleware, reduce switching costs, and unify the messaging programming model.

Liverpoolfc.tv: cloud. Spring. IO/spring – clou…

2. Design idea

(1) standard MQ

The Message content: Message is transmitted between producers and consumers through the Message media

Messages must travel through a specific channel: MessageChannel

How is the message consumed in the MessageChannel, and who is responsible for sending and receiving it: the subinterface of the MessageChannel MessageChannel, SubscribableChannel, is subscribed by the MessageHandler MessageHandler

(2) Why use Cloud Stream

For example, we use RabbitMQ and Kafka. Because of the architectural differences between the two messaging middleware, such as Exchange for RabbitMQ and Topic and Partitions for Kafka, there are differences.

① Why can stream unify the underlying differences?

In the absence of the concept of binders, when our SpringBoot applications interact directly with message-oriented middleware, the implementation details of each of the message-oriented middleware are quite different due to the different purposes for which they were built. By defining the binder as the middle tier, the separation between the application and the details of the messaging middleware is perfectly achieved. By exposing a uniform Channel Channel to the application, it eliminates the need for the application to consider various messaging middleware implementations.

(2) Binder

INPUT corresponds to the consumer and OUTPUT corresponds to the producer

By defining a Binder as an intermediate layer, the application is isolated from the details of the messaging middleware.

(3) Message communication in Stream follows the publish-subscribe pattern

Topics broadcast: Exchange in RabbitMQ and Topic in Kakfa

3. Standard process routine

Binder: A convenient way to connect middleware and mask differences.

Channel: A Channel is an abstraction of Queue. It is the medium for storing and forwarding messages in message communication systems. A Channel is used to configure queues.

Source and Sink: Simply understand that the reference object is Spring Cloud Stream itself, releasing messages from Stream is output, and receiving messages is input.

4. Code APIS and common annotations

Ii. Case Description

1. The RabbitMQ environment is OK

2. Create three submodules in the project

Cloud-stream-rabbitmq-provider8801, which serves as the message sending module for the producer

Cloud-stream-rabbitmq-consumer8802 as the message receiving module

Cloud-stream-rabbitmq-consumer8803 as the message receiving module

Message driven producers

1. Create a Module

cloud-stream-rabbitmq-provider8801

2, POM

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

3, YML

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
      stream:
        binders: Configure the rabbitMQ service to be bound to;
          defaultRabbit: # represents the name of the definition, which is used for binding integrations
            type: rabbit # Message component type
            environment: Set rabbitMQ environment configuration
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: Integration of services
          output: The name # is the name of a channel
            destination: studyExchange # indicates the Exchange name definition to use
            content-type: application/json Set the message type to JSON and the text to "text/plain".
            binder: defaultRabbit Set the specific Settings of the message service to be bound

eureka:
  client: # Configure Eureka registration on client
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # Set the heartbeat interval (default is 30 seconds)
    lease-expiration-duration-in-seconds: 5 # If the current interval exceeds 5 seconds (default is 90 seconds)
    instance-id: send-8801.com  # Display host name when listing information
    prefer-ip-address: true     # Change the access path to an IP address
Copy the code

4, the main startup class StreamMQMain8801

@SpringBootApplication
public class StreamMQMain8801{
    public static void main(String[] args){ SpringApplication.run(StreamMQMain8801.class,args); }}Copy the code

5. Business

(1) Interface for sending messages

public interface IMessageProvider{
    public String send(a) ;
}
Copy the code

(2) Send message interface implementation class

@EnableBinding(Source.class) // Can be understood as the definition of a message sending pipe
public class MessageProviderImpl implements IMessageProvider{
    @Resource
    private MessageChannel output; // The channel where the message is sent

    @Override
    public String send(a){
        String serial = UUID.randomUUID().toString();
        this.output.send(MessageBuilder.withPayload(serial).build()); // Create and send the message
        System.out.println("***serial: "+serial);
        returnserial; }}Copy the code

(3)Controller

@RestController
public class SendMessageController{
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(a){
        returnmessageProvider.send(); }}Copy the code

6, test,

Start 7001eureka, RabbitMQ, and 8801

Visit: http://localhost:8801/sendMessage

4. Message-driven consumers

1. Create a Module

cloud-stream-rabbitmq-consumer8802

2, POM

<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
	<groupId>org.springframework.cloud</groupId>
	<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

3, YML

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: Configure the rabbitMQ service to be bound to;
          defaultRabbit: # represents the name of the definition, which is used for binding integrations
            type: rabbit # Message component type
            environment: Set rabbitMQ environment configuration
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: Integration of services
          input: The name # is the name of a channel
            destination: studyExchange # indicates the Exchange name definition to use
            content-type: application/json Set the type of the message to "object json" or "text/plain".
            binder: defaultRabbit Set the specific Settings of the message service to be bound

eureka:
  client: # Configure Eureka registration on client
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # Set the heartbeat interval (default is 30 seconds)
    lease-expiration-duration-in-seconds: 5 # If the current interval exceeds 5 seconds (default is 90 seconds)
    instance-id: receive-8802.com  # Display host name when listing information
    prefer-ip-address: true     # Change the access path to an IP address
Copy the code

4, the main startup class StreamMQMain8802

@SpringBootApplication
public class StreamMQMain8802{
    public static void main(String[] args){ SpringApplication.run(StreamMQMain8802.class,args); }}Copy the code

5. Business

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController{
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        System.out.println("Consumer 1, -------> Received message:" + message.getPayload()+"\t port: "+serverPort); }}Copy the code

6, test 8801 send 8802 receive message

http://localhost:8801/sendMessage

Group consumption and persistence

1, according to 8802, clone out a run 8803

2, start,

RabbitMQ, 7001 service registration, 8801 message production, 8802 message consumption, 8803 message consumption

3. There are two problems after operation

There are duplicate consumption issues, message persistence issues

4, consumption

At present, we have received both 8802/8803 at the same time. There is a problem of repeated consumption

How to solve: Grouping and persistent attribute group

Production case: for example, in the following scenario, the order system in cluster deployment will get the order information from RabbitMQ. If the same order is obtained by two services at the same time, the data error will be caused. We need to avoid this situation. At this point we can use message grouping in Stream to solve this problem.

Note that multiple consumers in the same group in the Stream are competing, ensuring that the message is consumed only once by one of the applications. Different groups can consume all (repeat consumption), and there will be competition within the same group, only one of them can consume.

5, grouping

(1) principle

Placing microservice applications in the same group ensures that messages are consumed only once by one of them.

Different groups can consume, and there will be competition within the same group, only one of them can consume.

(2) operation

8802/8803 implements polling grouping with only one consumer at a time. Messages sent by module 8801 can only be received by either 8802 or 8803, thus avoiding repeated consumption.

8802 and 8803 become the same group, and the two groups are the same: atguiguA

8802, 8803 Modify YML

input: The name # is the name of a channel and will be explained when analyzing the specific source code
  destination: studyExchange # indicates the Exchange name definition to use
  content-type: application/json Set the type of the message to "object json" or "text/plain".
  binder: defaultRabbit Set the specific Settings of the message service to be bound
  group: atguiguA
Copy the code

(3) conclusion

For multiple microservice instances of the same group, only one at a time will be received

6. Persistence

Stop 8802/8803 and remove 8802 from group atguiguA. 8801 sends four messages to RabbitMQ

Start 8802 first, no group attribute configuration, no message is typed in the background

Start 8803 again, have group attribute configuration, the background type MQ message (four messages received by 8803)