The last:SpringCloud Hoxton + SpringCloud Alibaba learning Notes (5) — Gateway new generation Gateway
A message driven overview
1. What is it
① in a word
Shielding the differences of underlying messaging middleware, reducing switching versions, and unifying the programming model of messaging
(2), the official website
Spring. IO/projects/sp… Cloud. Spring. IO/spring – clou…Spring Cloud Stream Tutorial in ChineseM.wang1314.com/doc/webapp/…
2. Design idea
① standard MQ
How messages in a Message channel are consumed, and who is responsible for sending and receiving the messages: The SubscribableChannel subinterface of the MessageChannel is subscribed to by the MessageHandler MessageHandler
②、为什么用Cloud Stream
Why should a STREAM unify underlying differences Binder INPUT corresponds to the consumer and OUTPUT corresponds to the producer
③ Message communication in Stream follows the publish-subscribe model
Topic topics broadcast are exchanges in RabbitMQ and topics in Kafka
3. Spring Cloud Stream standard process routines
(1), Binder,
Very convenient connection middleware, shielding differences
(2), the Channel,
Channel, an abstraction of Queue, is the medium of storage and forwarding in message communication system. The Queue is configured through Channel
③ Source and Sink
Simply put, the reference object is the Spring Cloud Stream itself. Publishing messages from the Stream is the output, and receiving messages is the input
4. Coding apis and common annotations
2. Case Description
Install the RabbitMQ environment Three new sub-modules, Cloud-stream-rabbitmq-provider8801, are created as the producer to send messages, cloud-stream-rabbitmq-consumer8802, as the message receiving module Cloud-stream-rabbitmq-consumer8803, as the message receiving module
Message-driven producers
1. Create a Module
cloud-stream-rabbitmq-provider8801
2, POM
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.atguigu.springcloud</groupId>
<version>1.0 the SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-provider8801</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<! -- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Copy the code
3, YML
server:
port: 8801
spring:
application:
name: cloud-stream-provider
cloud:
stream:
binders: Configure the rabbitMQ service to bind to rabbitMQ.
defaultRabbit: # represents the name of the definition used for binding consolidation
type: rabbit Message component type
environment: Set the environment configuration for RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Service integration processing
output: This name is the name of a channel
destination: studyExchange # indicates the Exchange name definition to use
content-type: application/json # set message type to JSON and text to "text/plain"
binder: defaultRabbit Set the specific Settings for the message service to bind
eureka:
client: The client is configured to register with Eureka
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
instance-id: send-8801.com Display host name in info list
prefer-ip-address: true Change the access path to an IP address
Copy the code
4. The main startup class StreamMQMain8801
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); }}Copy the code
5. Business class
1. Interface for sending messages
package com.atguigu.springcloud.service;
public interface IMessageProvider
{
public String send(a);
}
Copy the code
②, send message interface implementation class
package com.atguigu.springcloud.service.impl;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.integration.support.MessageBuilderFactory;
import org.springframework.messaging.MessageChannel;
import org.springframework.integration.support.MessageBuilder;
import javax.annotation.Resource;
import org.springframework.cloud.stream.messaging.Source;
import javax.annotation.Resource;
import java.util.UUID;
@EnableBinding(Source.class) // Define the push pipe for the message
public class MessageProviderImpl implements IMessageProvider
{
@Resource
private MessageChannel output; // Message sending pipeline
@Override
public String send(a)
{
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("*****serial: "+serial);
return null; }}Copy the code
(3), the Controller
package com.atguigu.springcloud.controller;
import com.atguigu.springcloud.service.IMessageProvider;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@RestController
public class SendMessageController
{
@Resource
private IMessageProvider messageProvider;
@GetMapping(value = "/sendMessage")
public String sendMessage(a)
{
returnmessageProvider.send(); }}Copy the code
6, test,
①, start 7001Eureka
② Start RabbitMQ
rabbitmq-plugins enable rabbitmq_management http://localhost:15672/
③ Start 8801
(4), access,
http://localhost:8801/sendMessage
4. Message driven consumers
1. Create a Module
cloud-stream-rabbitmq-consumer8802
2, POM
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.atguigu.springcloud</groupId>
<version>1.0 the SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.cloud/spring-cloud-starter-eureka-server -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>com.atguigu.springcloud</groupId>
<artifactId>cloud-api-commons</artifactId>
<version>${project.version}</version>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-devtools -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<! -- https://mvnrepository.com/artifact/org.projectlombok/lombok -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<! -- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-test -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Copy the code
3, YML
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: Configure the rabbitMQ service to bind to rabbitMQ.
defaultRabbit: # represents the name of the definition used for binding consolidation
type: rabbit Message component type
environment: Set the environment configuration for RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Service integration processing
input: This name is the name of a channel
destination: studyExchange # indicates the Exchange name definition to use
content-type: application/json # set message type to JSON and text to "text/plain"
binder: defaultRabbit Set the specific Settings for the message service to bind
eureka:
client: The client is configured to register with Eureka
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
instance-id: receive-8802.com Display host name in info list
prefer-ip-address: true Change the access path to an IP address
Copy the code
StreamMQMain8802
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); }}Copy the code
5. Business class
package com.atguigu.springcloud.controller;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message) {
System.out.println("Consumer 1, accept:"+message.getPayload()+"\t port:"+serverPort); }}Copy the code
6, test 8801 send 8802 receive messages
http://localhost:8801/sendMessage
Group consumption and persistence
1. Clone a copy of 8803 according to 8802
cloud-stream-rabbitmq-consumer8803
(1), POM
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>cloud2020</artifactId>
<groupId>com.atguigu.springcloud</groupId>
<version>1.0 the SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>cloud-stream-rabbitmq-consumer8803</artifactId>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<! -- Basic Configuration -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<scope>runtime</scope>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Copy the code
(2), YML
server:
port: 8803
spring:
application:
name: cloud-stream-consumer
cloud:
stream:
binders: Configure the rabbitMQ service to bind to rabbitMQ.
defaultRabbit: # represents the name of the definition used for binding consolidation
type: rabbit Message component type
environment: Set the environment configuration for RabbitMQ
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
bindings: # Service integration processing
input: This name is the name of a channel
destination: studyExchange # indicates the Exchange name definition to use
content-type: application/json # set the message type, this time to object JSON, if it is text set "text/plain"
binder: defaultRabbit Set the specific Settings for the message service to bind
group: atguiguA
eureka:
client: The client is configured to register with Eureka
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 Set heartbeat interval (default: 30 seconds)
lease-expiration-duration-in-seconds: 5 # If the 5 seconds interval is now exceeded (default is 90 seconds)
instance-id: receive-8803.com Display host name in info list
prefer-ip-address: true Change the access path to an IP address
Copy the code
③ Main boot class
package com.atguigu.springcloud;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class StreamMQMain8803
{
public static void main(String[] args)
{ SpringApplication.run(StreamMQMain8803.class,args); }}Copy the code
④ Business class
package com.atguigu.springcloud.controller;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Component;
@Component
@EnableBinding(Sink.class)
public class ReceiveMessageListenerController
{
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> message)
{
System.out.println("Consumer 2,-----> Received message:"+message.getPayload()+"\t port: "+serverPort); }}Copy the code
2, start,
RabbitMQ service registers 7001 message production 8801 message consumption 8802 message consumption 8803
3. Run the last two problems
There are repeated consumption issues, message persistence issues
4, consumption
Currently, 8802/8803 have been received at the same time, and there is a problem of repeated consumptionhttp://localhost:8801/sendMessageHow to solve it? Grouping and persisting attribute groups
5, grouping
Principle: Microservice applications are placed in the same group to ensure that messages are consumed only once by one of the applications. Different groups can consume, there will be competition within the same group, only one of them can consume. 8802/8803 are all going to be different groups, different groups. Group: atguiguA, atguiguB 8802 modify YML
group: atguiguA
Copy the code
8803 modified YML
group: atguiguB
Copy the code
We configure ourselvesConclusion: Repeat consumption 8802/8803 implements polling grouping, each time only one message sent by the consumer module 8801 can only be received by either 8802 or 8803, thus avoiding repeated consumption
6. Persistence
①, through the above, to solve the problem of repeated consumption, and then look at persistence
② Stop 8802/8803 and remove group atguiguA from 8802
8803 group group:atguiguA not removed