The RabbitMQ integration SpringCloud
Hello! Welcome to Java growth notes, mainly for mutual communication, mutual learning, also hope to share can help everyone, if there is any mistake, hope to correct, thank you!
Integrate RabbitMQ introduction
1. SpringCloudStream is positioned to be compatible with the integration of mainstream messaging middleware and reduce the configuration of different messaging middleware integration. It is relatively easy to integrate RabbitMQ configuration and does not need to define the corresponding switch, queue, and relationship binding, which reduces the configuration.
2. SpringCloudStream can diversify message producers and consumers by means of superstructure processing, and does not need to be constrained to use the same message-oriented middleware on the production and consumer side. For example, RabbitMQ can be used on the production side and Kafka can be used on the consumer side, eliminating the need for developers to integrate different configurations and implementing high-performance production and consumption scenarios with only a few annotations.
3, of course, it also has a very big problem is that it can not achieve the reliability of message delivery, that is, can not guarantee the 100% reliability of the message, there will be a small number of message loss.
Importing dependency Configuration
Mainly depends on
// Specify a unified configuration
<properties>
<java.version>1.8</java.version>
<spring.cloud-version>Greenwich.SR6</spring.cloud-version>
<spring.boot-version>Brussels-SR17</spring.boot-version>
</properties>
/ / springcloud dependency
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-dependencies</artifactId>
<version>${spring.cloud-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
// Other configurations<dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> </dependencies>Copy the code
Use notes
@Output: Output annotation that defines the interface for sending messages@input: Enter annotations that define the consumer interface for the message@StreamListener: annotation used to define listening methods@EnableBinding: Starts the binding relationshipCopy the code
Message producer
Configuration description at the production end
Spring: Cloud: stream: binders: defaultRabbit: # Specifies the message type for bidding # rabbitmq addresses:127.0. 01.:5672Username: rabbitmq password:123456Virtual-host: / # Virtual path Bindings: userOutPutChannel: destination: Exchange_cloud # Exchange name, Exchange mode default topic, bind stream output channel to Exchange_cloud switch group: userGroup # group name, production end and consumer end names need to be the samedefault- Binder :defaultRabbit # and binders:defaultRabbit defined above need to be consistent content-type: application/json # Set message type to JSONCopy the code
Production code
The display is as follows:
// Define channels for output types
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.stereotype.Component;
@Component
public interface Barista {
final static String OUTPUT_CHANNEL = "userOutPutChannel";
/* * @Description: Define an output type of channel * @ Author ly * @ param [] * @ return org. Springframework. Messaging. The MessageChannel * @ date 2021/3/22 17:07 * /
@Output(Barista.OUTPUT_CHANNEL)
MessageChannel userOutPutChannel(a);
}
// Implement class encapsulation
import com.alibaba.fastjson.JSON;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;
import javax.annotation.Resource;
import java.util.Map;
@EnableBinding(Barista.class)
@Service
@Slf4j
public class RabbitSender {
@Resource
@Output(Barista.OUTPUT_CHANNEL)
private MessageChannel channel;
@Resource
private Barista barista;
/* * @description: channel Sends messages * @author ly * @date 2021/3/23 11:12 */
public void sendMsg (String msg) {
channel.send(MessageBuilder.withPayload(msg).build());
log.error("Channel message sent successfully: {}" + msg);
}
/* * @description: Barista sends a message * @author ly * @date 2021/3/23 11:12 */
public void sendMessage (Object message, Map<String, Object> properties) {
final MessageHeaders messageHeaders = new MessageHeaders(properties);
final Message msg = MessageBuilder.createMessage(message, messageHeaders);
final boolean sendResult = barista.userOutPutChannel().send(msg);
log.error("Barista message sent successfully :{},sendResult:{}"+ JSON.toJSONString(msg), sendResult); }}// Test class processing
import com.google.common.collect.ImmutableMap;
import com.show.model.User;
import com.show.service.impl.RabbitSender;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.test.context.junit4.SpringRunner;
import javax.annotation.Resource;
import java.math.BigDecimal;
import java.util.Map;
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ECloudProducerApplicationTests.class)
@ComponentScan(basePackages = {"com.show.*"})
@Slf4j
public class ECloudProducerApplicationTests {
@Resource
private RabbitSender rabbitSender;
@Test
public void sendMessage (a) {
final String message = "Hello RabbitMQ";
rabbitSender.sendMsg(message);
}
@Test
public void sendRabbitMessage (a) {
final Map<String, Object> properties = ImmutableMap.of("cloud-stream"."cloud-stream");
final User user = new User("simon"."123456".22.new BigDecimal(100)); rabbitSender.sendMessage(user, properties); }}Copy the code
Consumer code
Configuration description of the consumer
Spring: cloud: stream: binders: defaultRabbit: #127.0. 01.:5672
username: rabbit
password: 123456Virtual-host: / Bindings: userInChannel: # Exchange_cloud # Exchange mode default topic, bind the message output channel of stream to exchange_cloud exchange group: userGroup # Application /json # Consume message type JSONdefaultBinder :defaultRabbit # Bind to binders:defaultRabbit1Bindings :userInChannel (bindings:userInChannel, bindings:userInChannel, bindings:userInChannel)false# SupportreturnAcknowledge -mode: MANUAL # acknowledge recovery-interval:3000# 3s reconnection durable- Subscription:trueMax -concurrency:5# Maximum number of listenersCopy the code
Consumer code
// Define an input channel
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.messaging.SubscribableChannel;
import org.springframework.stereotype.Component;
@Component
public interface Barista {
String INPUT_CHANNEL = "userInChannel";
/* * @Description: Define the type of an input channel * @ Author ly * @ param [] * @ return org. Springframework. Messaging. The SubscribableChannel * @ date 2021/3/22 17:34 * /
@Input(Barista.INPUT_CHANNEL)
SubscribableChannel userInChannel(a);
}
// Define the input channel listener class
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import com.show.service.Barista;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
@EnableBinding(Barista.class)
@Service
@Slf4j
public class MQReceiver {
@StreamListener(Barista.INPUT_CHANNEL)
public void receiver(Message message) throws Exception {
Channel channel = (Channel) message.getHeaders().get(AmqpHeaders.CHANNEL);
Long deliveryTag = (Long) message.getHeaders().get(AmqpHeaders.DELIVERY_TAG);
log.error("Finished consumption: {}, Object: {}", System.currentTimeMillis(), JSON.toJSON(message));
channel.basicAck(deliveryTag, false); }}Copy the code
Example of receiving a message Object message return information example
The end of this chapter, will continue to update, share Java growth notes, I hope we can grow together. If you find my share useful, please remember to like and follow! This is the best encouragement for me. Thank you very much! PS: Reprint please indicate the source!