The last:SpringCloud Hoxton + SpringCloud Alibaba learning Notes (5) — Gateway new generation Gateway

A message driven overview

1. What is it

① in a word

Shielding the differences of underlying messaging middleware, reducing switching versions, and unifying the programming model of messaging

(2), the official website

Spring. IO/projects/sp… Cloud. Spring. IO/spring – clou…Spring Cloud Stream Tutorial in ChineseM.wang1314.com/doc/webapp/…

2. Design idea

① standard MQ

How messages in a Message channel are consumed, and who is responsible for sending and receiving the messages: The SubscribableChannel subinterface of the MessageChannel is subscribed to by the MessageHandler MessageHandler

②、为什么用Cloud Stream

Why should a STREAM unify underlying differences Binder INPUT corresponds to the consumer and OUTPUT corresponds to the producer

③ Message communication in Stream follows the publish-subscribe model

Topic topics broadcast are exchanges in RabbitMQ and topics in Kafka

3. Spring Cloud Stream standard process routines

(1), Binder,

Very convenient connection middleware, shielding differences

(2), the Channel,

Channel, an abstraction of Queue, is the medium of storage and forwarding in message communication system. The Queue is configured through Channel

③ Source and Sink

Simply put, the reference object is the Spring Cloud Stream itself. Publishing messages from the Stream is the output, and receiving messages is the input

4. Coding apis and common annotations

2. Case Description

Install the RabbitMQ environment Three new sub-modules, Cloud-stream-rabbitmq-provider8801, are created as the producer to send messages, cloud-stream-rabbitmq-consumer8802, as the message receiving module Cloud-stream-rabbitmq-consumer8803, as the message receiving module

Message-driven producers

1. Create a Module

cloud-stream-rabbitmq-provider8801

2, POM


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2020</artifactId>
        <groupId>com.atguigu.springcloud</groupId>
        <version>1.0 the SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.atguigu.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    
        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

</project>
Copy the code

3, YML

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  cloud:
    stream:
      binders: Configure the rabbitMQ service to bind to rabbitMQ.
        defaultRabbit: # represents the name of the definition used for binding consolidation
          type: rabbit Message component type
          environment: Set the environment configuration for RabbitMQ
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Service integration processing
        output: This name is the name of a channel
          destination: studyExchange # indicates the Exchange name definition to use
          content-type: application/json # set message type to JSON and text to "text/plain"
          binder: defaultRabbit  Set the specific Settings for the message service to bind

eureka:
  client: The client is configured to register with Eureka
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
    lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
    instance-id: send-8801.com  Display host name in info list
    prefer-ip-address: true     Change the access path to an IP address

Copy the code

4. The main startup class StreamMQMain8801

package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8801 {
    public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); }}Copy the code

5. Business class

1. Interface for sending messages
package com.atguigu.springcloud.service;


public interface IMessageProvider
{
    public String send(a);
}

Copy the code
②, send message interface implementation class
package com.atguigu.springcloud.service.impl;

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;

import javax.annotation.Resource;
import java.util.UUID;

@EnableBinding(Source.class) // Define the push pipe for the message
public class MessageProviderImpl implements IMessageProvider
{
    @Resource
    private MessageChannel output; // Message sending pipeline

    @Override
    public String send(a)
    {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("*****serial: "+serial);
        return null; }}Copy the code
(3), the Controller
package com.atguigu.springcloud.controller;

import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import javax.annotation.Resource;

@RestController
public class SendMessageController
{
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping(value = "/sendMessage")
    public String sendMessage(a)
    {
        returnmessageProvider.send(); }}Copy the code

6, test,

①, start 7001Eureka
② Start RabbitMQ

rabbitmq-plugins enable rabbitmq_management http://localhost:15672/

③ Start 8801
(4), access,

http://localhost:8801/sendMessage

4. Message driven consumers

1. Create a Module

cloud-stream-rabbitmq-consumer8802

2, POM


      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2020</artifactId>
        <groupId>com.atguigu.springcloud</groupId>
        <version>1.0 the SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>

    <dependencies>


        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>

        <dependency>
            <groupId>com.atguigu.springcloud</groupId>
            <artifactId>cloud-api-commons</artifactId>
            <version>${project.version}</version>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project> 

Copy the code

3, YML

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  cloud:
    stream:
      binders: Configure the rabbitMQ service to bind to rabbitMQ.
        defaultRabbit: # represents the name of the definition used for binding consolidation
          type: rabbit Message component type
          environment: Set the environment configuration for RabbitMQ
            spring:
              rabbitmq:
                host: localhost
                port: 5672
                username: guest
                password: guest
      bindings: # Service integration processing
        input: This name is the name of a channel
          destination: studyExchange # indicates the Exchange name definition to use
          content-type: application/json # set message type to JSON and text to "text/plain"
          binder: defaultRabbit  Set the specific Settings for the message service to bind

eureka:
  client: The client is configured to register with Eureka
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
    lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
    instance-id: receive-8802.com  Display host name in info list
    prefer-ip-address: true     Change the access path to an IP address
Copy the code

StreamMQMain8802

package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class StreamMQMain8802 {

    public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); }}Copy the code

5. Business class

package com.atguigu.springcloud.controller;

import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;

@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message) {
        System.out.println("Consumer 1, accept:"+message.getPayload()+"\t port:"+serverPort); }}Copy the code

6, test 8801 send 8802 receive messages

http://localhost:8801/sendMessage

Group consumption and persistence

1. Clone a copy of 8803 according to 8802

cloud-stream-rabbitmq-consumer8803

(1), POM

      
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2020</artifactId>
        <groupId>com.atguigu.springcloud</groupId>
        <version>1.0 the SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8803</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <! -- Basic Configuration -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
Copy the code
(2), YML
server:
  port: 8803

spring:
  application:
    name: cloud-stream-consumer
  cloud:
      stream:
        binders: Configure the rabbitMQ service to bind to rabbitMQ.
          defaultRabbit: # represents the name of the definition used for binding consolidation
            type: rabbit Message component type
            environment: Set the environment configuration for RabbitMQ
              spring:
                rabbitmq:
                  host: localhost
                  port: 5672
                  username: guest
                  password: guest
        bindings: # Service integration processing
          input: This name is the name of a channel
            destination: studyExchange # indicates the Exchange name definition to use
            content-type: application/json # set the message type, this time to object JSON, if it is text set "text/plain"
            binder: defaultRabbit Set the specific Settings for the message service to bind
            group: atguiguA

eureka:
  client: The client is configured to register with Eureka
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
    lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
    instance-id: receive-8803.com  Display host name in info list
    prefer-ip-address: true     Change the access path to an IP address
Copy the code
③ Main boot class
package com.atguigu.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;


@SpringBootApplication
public class StreamMQMain8803
{
    public static void main(String[] args)
    { SpringApplication.run(StreamMQMain8803.class,args); }}Copy the code
④ Business class
package com.atguigu.springcloud.controller;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
 
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
    @Value("${server.port}")
    private String serverPort;


    @StreamListener(Sink.INPUT)
    public void input(Message<String> message)
    {
        System.out.println("Consumer 2,-----> Received message:"+message.getPayload()+"\t port: "+serverPort); }}Copy the code

2, start,

RabbitMQ service registers 7001 message production 8801 message consumption 8802 message consumption 8803

3. Run the last two problems

There are repeated consumption issues, message persistence issues

4, consumption

Currently, 8802/8803 have been received at the same time, and there is a problem of repeated consumptionhttp://localhost:8801/sendMessageHow to solve it? Grouping and persisting attribute groups

5, grouping

Principle: Microservice applications are placed in the same group to ensure that messages are consumed only once by one of the applications. Different groups can consume, there will be competition within the same group, only one of them can consume. 8802/8803 are all going to be different groups, different groups. Group: atguiguA, atguiguB 8802 modify YML

group:  atguiguA
Copy the code

8803 modified YML

group:  atguiguB
Copy the code

We configure ourselvesConclusion: Repeat consumption 8802/8803 implements polling grouping, each time only one message sent by the consumer module 8801 can only be received by either 8802 or 8803, thus avoiding repeated consumption

6. Persistence

①, through the above, to solve the problem of repeated consumption, and then look at persistence
② Stop 8802/8803 and remove group atguiguA from 8802

8803 group group:atguiguA not removed

③ 8801 sends four messages to RabbitMQ
④ Start 8802 first, no group attribute configuration, no messages out in the background
⑤, start 8803 first, have group attribute configuration, background type out MQ message

Next up:SpringCloud Sleuth distributed request link tracing