This is the 16th day of my participation in Gwen Challenge

Spring Cloud Bus

An overview,

Message Bus is a distributed automatic refresh configuration function, and Spring Cloud Bus and Spring Cloud Config realize dynamic refresh of configuration. Spring Cloud Bus integrates Java event processing with messaging middleware. The Bus supports two messaging middleware, RabbitMQ and Kafka. Spring Cloud Bus manages and disseminates messages between distributed systems, acting like a distributed actuator for broadcasting state changes, event pushing, and so on. It can also be used as a communication channel between microservices.

Second, environment building

Sends a notification to the server in the configuration center, and the server transmits the notification to the client. Use RabbitMQ as the messaging middleware.

Add a dependency on server 3344

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
Copy the code

Server 3344 Add the RabbitMQ configuration

.
spring:
  application:
    name: cloud-config-center
  # MQ configuration
  rabbitmq:
    host: 47.95226.96.
    port: 5672
    username: guest
    password: guest
.
Mq-related configuration to expose bus refresh endpoints
management:
  endpoints:
    web:
      exposure:
        include: 'bus-refresh'
Copy the code

Client 3355/3366 Adds a dependency

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-bus-amqp</artifactId>
</dependency>
Copy the code

Client 3355/3366 Adds the MQ configuration

spring:
  application:
    name: config-client
  # MQ configuration
  rabbitmq:
    host: 47.95226.96.
    port: 5672
    username: guest
    password: guest
Copy the code

Test: after the above configuration, again to modify the content of the configuration file, and the center of the distributed configuration server to send a post request to http://localhost:3344/actuator/bus-refresh, you can broadcast to all clients. How it works: ConfigClient instances listen to the same topic in MQ, and when a service refreshes data, it puts this information into the topic, so that services listening to the same topic can be notified to update their configuration.

Dynamic point refresh

The specified service application is refreshed each time the configuration file is modified. For example: modify the configuration file on GitHub, as long as 3355 visible. To http://localhost:3344/actuator/bus-refresh/ fixed-point refresh service name: port number to send a post request. Such as: http://localhost:3344/actuator/bus-refresh/config-client:3355

Message Driven -Spring Cloud Stream

An overview,

1、为什么引入Stream

Now there are many kinds of message middleware, it is a heavy burden to learn, and different platforms use different message middleware, the same system may have multiple MQ, which is difficult to use in practice. Now you just need a way to adapt bindings and automatically switch between different MQ. Shielding the differences of the underlying messaging middleware, reducing switching costs, and agreeing to the messaging programming model.

2. What is Spring Cloud Stream

Spring Cloud Stream is a framework for building message-driven microservices. Applications interact with binder objects in Stream through inputs or outputs. By configuring binders (bindings), Stream’s Binder objects are responsible for interacting with the message-oriented middleware. So you just need to know how to interact with the Stream to easily use the message driver. Spring Integration is used to connect the message broker middleware to realize message event-driven, which refers to three core concepts of publish-subscribe, consumer group and partition.

3. Design idea

No pre-Spring Cloud Stream messaging middleware is used

Message content is transmitted between producers and consumers through the medium. Messages must go through a specific channel. The messages are sent by the SubscribableChannel subinterface of MessageChannel, and the MessageHandler message processor processes the subscribed consumers to get the messages. If a system uses both RabbitMQ and Kafka, the two messaging-middleware architectures are different and cannot communicate directly.

After using Spring Cloud Stream

Spring Cloud Stream shields underlying differences: The implementation defines the banner as the middle layer, perfectly isolating the application from the consumer middleware. By exposing a unified Channel to the application, the application is no longer concerned with different message-oriented middleware implementations. Inputs for Banner: Inputs for Banner

The isolation between the messaging middleware and application details is achieved through the Banner binder as an intermediate layer.

Spring Cloud Stream standard flow and common annotations
  • Banner: Binder, connected middleware, masking differences.
  • Channel: a Channel, similar to a queue, used in a messaging system to store and forward information.
  • Source and Sink: output and input.
  • @input: Identifies the Input channel through which the message received is entered into the application.
  • @output: Identifies the Output channel through which published messages leave the application.
  • StreamListener: Listener queue, used to consume group queue message reception.
  • @enableBinding: Channel and Exchange are bound together.

Second, environment building

1. Producers

Create the cloud-stream-rabbitmq-provider8801 module and import the dependencies

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

The configuration file

server:
  port: 8801
spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders:  Configure the rabbitMQ service to bind to
        defaultRabbit:  # denotes the definition name for binding consolidation
          type: rabbit  Message component type
          environment:  Configure the rabbitMQ environment
            spring:
              rabbitmq:
                host: 47.95226.96.
                port: 5672
                username: guest
                password: guest
      bindings:         # Service integration processing
        output:         # channel name
          destination: studyExchange # indicates the name of the Exchange to use
          content-type: application/json  Set the message type to JSON
          binder: defaultRabbit   # set the specific Settings of the binding message service

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2  The heartbeat interval defaults to 30 seconds
    lease-expiration-duration-in-seconds: 5 If no response exceeds 5 seconds, default is 90 seconds
    instance-id: send-8801.com  The host name is displayed in the information list
    prefer-ip-address: true     The access path changes to an IP address
Copy the code

The Service that produces the message

@EnableBinding(Source.class)        // Define the push pipe for the message
public class IMessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output; // Message sending pipeline

    @Override
    public String send(a) {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println(serial);
        return null; }}Copy the code

Controller

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider iMessageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(a){
        returniMessageProvider.send(); }}Copy the code

Visit http://localhost:8801/sendMessage to send a message to the RabbitMQ.

2. Consumers

Create the cloud-stream-rabbitmq-Consumer8802 module and import the dependencies

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
Copy the code

The configuration file

server:
  port: 8802
spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders:  Configure the rabbitMQ service to bind to
        defaultRabbit:  # denotes the definition name for binding consolidation
          type: rabbit  Message component type
          environment:  Configure the rabbitMQ environment
            spring:
              rabbitmq:
                host: 47.95226.96.
                port: 5672
                username: guest
                password: guest
      bindings:         # Service integration processing
        input:         # channel name
          destination: studyExchange # indicates the name of the Exchange to use
          content-type: application/json  Set the message type to JSON
          binder: defaultRabbit   Set the specific Settings for the bound message service

eureka:
  client:
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2  The heartbeat interval defaults to 30 seconds
    lease-expiration-duration-in-seconds: 5 If no response exceeds 5 seconds, default is 90 seconds
    instance-id: receive-8802.com  The host name is displayed in the information list
    prefer-ip-address: true     The access path changes to an IP address
Copy the code

The Controller used to receive messages

@RestController
@EnableBinding(Sink.class)		// The pipe that receives the message
public class ReceiveMessageController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)	// Listen to the queue
    public void input(Message<String> message) {
        System.out.println("consumer8802:" + message.getPayload() + "\t"+ serverPort); }}Copy the code

Repeat consumption of Stream

Create a consumer group module cloud-stream-rabbitmq-consumer8803, two consumers to consume the message will appear two problems, one is repeated consumption problem, the other is message persistence problem.

Double consumption problem: Now send a message to MQ and both consumers will consume the message, there is double consumption problem, if one order message is consumed by two consumers, there will be a data error. This can be solved by using consumption groups in the Stream, where consumers in the same group in the Stream are competing to ensure that the message will be consumed by only one consumer.

Add a group to 8802 and 8803 configuration files

bindings:         # Service integration processing
  input:         # channel name
    destination: studyExchange # indicates the name of the Exchange to use
    content-type: application/json  Set the message type to JSON
    binder: defaultRabbit   Set the specific Settings for the bound message service
    group: yylmA
Copy the code

Sending the message again will only be consumed by one consumer.

Stream message persistence

If the consumer’s server is not up or down when the producer sends the message, the message will be missed. Use the same Stream grouping as above to solve this problem.