preface

RabbiMQ introduction

First, use scenarios

RabbitMQ is a message-oriented middleware, so its primary role is as a message buffer, enabling asynchronous and decoupled applications. RabbitMQ is a type of messaging middleware that implements AMQP (Advanced Message Queuing Protocol). It originated in the financial system and is used to store and forward messages in distributed systems. RabbitMQ has excellent usability, scalability, and high availability. RabbitMQ is primarily implemented for bidirectional decoupling between systems. When producers churn out data and consumers can’t consume it quickly, you need an intermediate layer. Save this data. AMQP (Advanced Message Queuing Protocol) is an open standard of application-layer protocols designed for message-oriented middleware. Message-oriented middleware is primarily used for decoupling between components so that the sender of a message does not need to be aware of the message consumer and vice versa. The main characteristics of AMQP are message orientation, queue, routing (including point-to-point and publish/subscribe), reliability, and security. See RabbitMQ’s official guide for detailed concepts

Ii. Related concepts

When we talk about queued services, we usually have three concepts: sender, queue and receiver. RabbitMQ abstracts this basic concept further by adding an Exchange between the sender and queue. In this way, there is no direct contact between the sender and the queue. Instead, the sender sends the message to the exchange, which in turn sends the message to the queue according to the scheduling policy.

There are four important concepts: virtual host, switch, queue, and binding.

  • Virtual host V-host: A virtual host holds a set of switches, queues, and bindings. Why multiple virtual hosts? Quite simply, RabbitMQ allows users to control permissions only at the granularity of the virtual host. Therefore, if you want to deny group A access to group B’s switches/queues/bindings, you must create A virtual host for both group A and group B. Each RabbitMQ server has a default virtual host.
  • Switch: Exchange forwards messages, but it does not store them. If the Exchange does not Queue bind to the Exchange, it simply discards messages sent by Producer. Here’s an important concept: routing keys. When the message arrives at the switch, the interaction will forward it to the corresponding queue, which queue will be forwarded according to the route key.
  • Bind: That is, the switch needs to bind to queues, as shown in the figure above

Exchange

The switch receives messages and forwards them to the bound queue. The switch does not store messages. If the ACK mode is enabled, the switch returns an error if it cannot find the queue. There are four types of switches: Direct, Topic, Headers and Fanout

  • Direct: The behavior of the Direct type is “match first, then deliver “. When a routingkey is set at binding time, messages that match the Routingkey will be sent to the binding queue by the exchange.
  • Topic: Forwarding messages according to rules (most flexible)
  • Headers: switch for which the header attribute type is set
  • Fanout: Forwards messages to all bound queues

Direct Exchange

Direct Exchange is the default switch mode for RabbitMQ and is the simplest mode to find queues based on key full text matching.

The first X-Q1 has a binding key named orange; X-q2 has two binding keys named black and green. When the routing key in the message corresponds to the binding key, then we know which queue the message is going to.

Ps: Why are there two binding keys for X to Q2: black and green? – This is mainly because there may be Q3 again, and Q3 accepts only black’s information, while Q2 accepts not only black’s information but also Green’s information.

Topic Exchange

Messages are forwarded to queues based on wildcards. On such switches, the binding between queues and switches defines a routing mode, and the switch can forward messages only if the wildcards match the routing mode and the routing key.

  • * (asterisk) can replace a word.
  • # (hash) can replace zero or more words.

Headers Exchange

Headers also matches by rule, and headers is a type of custom matching rule, as opposed to direct and topic, which use a routing_key. When a queue is bound to an exchange, a set of key-value pair rules are set, and a message contains a set of key-value pairs (headers attributes). When one or all of these key-value pairs match, the message is posted to the corresponding queue.

Fanout Exchange

The message broadcast model, which is our publish subscribe model. Fanout Exchange message broadcast mode, regardless of routing key or routing mode, sends messages to all queues bound to it, and is ignored if routing_key is configured.

Message to confirm

How does a message consumer notify Rabbit of a successful message consumption?

Each Message must be acknowledged (acknowledged). This can be done manually or automatically. The automatic ACK will acknowledge the Message immediately after it is sent to the consumer, but the Message may be lost. The consumer failed to process the message, so the message is lost. If the message has been processed, but the following code throws an exception, the consumer business logic will roll back if you use Spring to manage it, This also causes actual message loss. If a service is manually acknowledged, the consumer invokes ACK, nack, reject to confirm the message. Manual confirmation can be performed after a service fails. RabbitMQ will not send any more data to RabbitMQ, because RabbitMQ considers the service to be limited in its processing capacity. The ACK mechanism can also limit the flow of packets, such as sleeping for a few seconds when a message is received.

  • Acknowledgemode. NONE: Automatic acknowledgment
  • Acknowledgemode. AUTO: Acknowledged as applicable
  • Acknowledgemode. MANUAL: Indicates MANUAL acknowledgment

The RabbitMQ SpringBoot integration

  1. To configure poM, add spring-boot-starter-AMQP support. Springboot is based on version 2.1.4
  2. <dependency>
  3. <groupId>org.springframework.boot</groupId>
  4. <artifactId>spring-boot-starter-amqp</artifactId>
  5. </dependency>
  6. <dependency>
  7. <groupId>org.springframework.boot</groupId>
  8. <artifactId>spring-boot-starter-web</artifactId>
  9. </dependency>
  10. <dependency>
  11. <groupId>org.springframework.boot</groupId>
  12. <artifactId>spring-boot-starter-test</artifactId>
  13. <scope>test</scope>
  14. </dependency>
  15. <dependency>
  16. <groupId>org.projectlombok</groupId>
  17. <artifactId>lombok</artifactId>
  18. <optional>true</optional>
  19. </dependency>

Configure the yaml file 1 for Springboot.

1. `server:` 2. `servlet:` 3. `context-path:/rabbitmq` 4. `port:9004` 5. `spring:` 6. `application:` 7. `name: rabbitmq` 8. `rabbitmq:` 9. `host: localhost` 10. `virtual-host:/crawl` 11. `username: xxxx` 12. `password: 'port:5672' For example, 'publisher-returns:true' 16. '# confirm message sending correctly' 17. 'publisher-Confirm :true' 18. 'Template:' 19 20. 'enabled:true' 21. 'initial-interval:2s' 22.' Listener: '23.' Simple: '24 None '25.' acknowledge-mode: manual 'Copy the code

In addition we also need to configure an ACK confirmation callback configuration, by implementing RabbitTemplate. ConfirmCallback interface, the message is sent to the Broker triggered after the callback, which is only to confirm whether it is right to Exchange.

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. / * *
  8. * @author lijianqing
  9. * @version 1.0
  10. * @ClassName RabbitTemplateConfirmCallback
  11. * @date 2019/4/23 12:55
  12. * /
  13. @Component
  14. @Slf4j
  15. public class RabbitTemplateConfirmCallback implements RabbitTemplate.ConfirmCallback {
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. @PostConstruct
  19. public void init() {
  20. / / specified ConfirmCallback
  21. `rabbitTemplate.setConfirmCallback(this); `Copy the code
  22. }
  23. @Override
  24. public void confirm(CorrelationData correlationData, boolean ack, String cause) {
  25. Log.info (" message unique identifier: {}, confirm result: {}, failure cause: {}", correlationData, ACK, cause); `Copy the code
  26. }
  27. }

A message returns on failure, such as a routing step to a queue, or a callback is triggered if the west Osi message is successfully sent to the exchange but there is no matching queue

  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.core.Message;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import javax.annotation.PostConstruct;
  7. / * *
  8. * @author lijianqing
  9. * @version 1.0
  10. * @ClassName RabbitTemplateReturnCallback
  11. * @date 2019/4/23 12:55
  12. * /
  13. @Component
  14. @Slf4j
  15. public class RabbitTemplateReturnCallback implements RabbitTemplate.ReturnCallback {
  16. @Autowired
  17. private RabbitTemplate rabbitTemplate;
  18. @PostConstruct
  19. public void init() {
  20. / / specified ReturnCallback
  21. `rabbitTemplate.setReturnCallback(this); `Copy the code
  22. `rabbitTemplate.setMandatory(true); `Copy the code
  23. }
  24. @Override
  25. public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
  26. Log.info (" message body message: "+ message); `Copy the code
  27. Log.info (" Message body message: "+ replyCode); `Copy the code
  28. 'log.info(" Description: "+ replyText); `Copy the code
  29. 'log.info(" exchange used for messages: "+ exchange); `Copy the code
  30. 'log.info(" Routing: "+ routingKey); `Copy the code
  31. }
  32. }

Start simple – simple queues

The diagram below:

“P” is our producer, “C” is our consumer. The middle box is a queue – RabbitMQ represents the message buffer reserved for consumers.

Add SimpleConfig to create the queue we want to put

  1. / * *
  2. * Queue direct drop
  3. * @author lijianqing
  4. * @version 1.0
  5. * @ClassName SimpleConfig
  6. * @date 2019/4/26 Then it
  7. * /
  8. @Configuration
  9. public class SimpleConfig {
  10. @Bean
  11. public Queue simpleQueue() {
  12. return new Queue("simple");
  13. }
  14. }

Create message sender and message receiver respectively:

  • Message sender
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import zero.springboot.study.rabbitmq.model.User;
  7. import java.util.UUID;
  8. / * *
  9. * @author lijianqing
  10. * @version 1.0
  11. * @ClassName HelloSender
  12. * @date 2019/4/23 when
  13. * /
  14. @Component
  15. @Slf4j
  16. public class HelloSender {
  17. @Autowired
  18. private RabbitTemplate rabbitTemplate;
  19. public void send() {
  20. User user = new User();
  21. ` user. Elegantly-named setName (" green "); `Copy the code
  22. `user.setPass("111111"); `Copy the code
  23. // Send a message to the Hello queue
  24. Log.info (" Send message: {}", user); `Copy the code
  25. `rabbitTemplate.convertAndSend("hello", user,` `new` `CorrelationData(UUID.randomUUID().toString())); `Copy the code
  26. String msg = "hello qing";
  27. Log.info (" send message: {}", MSG); `Copy the code
  28. `rabbitTemplate.convertAndSend("simple", msg); `Copy the code
  29. }
  30. }
  • Message receiver
  1. import com.rabbitmq.client.Channel;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.springframework.amqp.rabbit.annotation.RabbitHandler;
  4. import org.springframework.amqp.rabbit.annotation.RabbitListener;
  5. import org.springframework.amqp.support.AmqpHeaders;
  6. import org.springframework.messaging.handler.annotation.Header;
  7. import org.springframework.stereotype.Component;
  8. import zero.springboot.study.rabbitmq.model.User;
  9. import java.io.IOException;
  10. / * *
  11. * Listen on the Hello queue
  12. *
  13. * @author lijianqing
  14. * @version 1.0
  15. * @ClassName HelloReceiver
  16. * @date 2019/4/23 11:42
  17. * /
  18. @Component
  19. @Slf4j
  20. @RabbitListener(queues = "simple")
  21. public class HelloReceiver {
  22. @RabbitHandler
  23. public void processUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  24. Log.info (" Received message: {}", user); `Copy the code
  25. / / ACK manually
  26. try {
  27. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  28. `channel.basicAck(tag,` `false); `Copy the code
  29. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  30. // channel.basicNack(deliveryTag, false, false);
  31. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  32. // channel.basicReject(deliveryTag,false);
  33. } catch (IOException e) {
  34. `e.printStackTrace(); `Copy the code
  35. }
  36. }
  37. @RabbitHandler
  38. public void processString(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  39. Log.info (" Received message: {}", message); `Copy the code
  40. / / ACK manually
  41. try {
  42. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  43. `channel.basicAck(tag,` `false); `Copy the code
  44. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  45. // channel.basicNack(deliveryTag, false, false);
  46. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  47. // channel.basicReject(deliveryTag,false);
  48. } catch (IOException e) {
  49. `e.printStackTrace(); `Copy the code
  50. }
  51. }
  52. }

This implements the pattern of simple messages being sent to a specified queue. Let’s write a test class

Direct Exchange mode

Configure our Direct Exchange switch and create a queue bound to the switch using routing keys

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.DirectExchange;
  4. import org.springframework.amqp.core.Queue;
  5. import org.springframework.context.annotation.Bean;
  6. import org.springframework.context.annotation.Configuration;
  7. / * *
  8. *
  9. * @author lijianqing
  10. * @version 1.0
  11. * @ClassName DirectConfig
  12. * @date 2019/4/23 11:15
  13. * /
  14. @Configuration
  15. public class DirectConfig {
  16. // Queue name
  17. public static final String QUEUE_NAME = "direct_name";
  18. // Switch name
  19. public static final String EXCHANGE = "zero-exchange";
  20. // Routing key name
  21. public static final String ROUTING_KEY = "routingKey";
  22. @Bean
  23. public Queue blueQueue() {
  24. return new Queue(QUEUE_NAME, true);
  25. }
  26. @Bean
  27. public DirectExchange defaultExchange() {
  28. return new DirectExchange(EXCHANGE);
  29. }
  30. @Bean
  31. public Binding bindingBlue() {
  32. return BindingBuilder.bind(blueQueue()).to(defaultExchange()).with(ROUTING_KEY);
  33. }
  34. }

Next we create producers and consumers

  • producers
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.rabbit.connection.CorrelationData;
  3. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  4. import org.springframework.beans.factory.annotation.Autowired;
  5. import org.springframework.stereotype.Component;
  6. import zero.springboot.study.rabbitmq.config.DirectConfig;
  7. import zero.springboot.study.rabbitmq.model.User;
  8. import java.util.UUID;
  9. / * *
  10. * @author lijianqing
  11. * @version 1.0
  12. * @ClassName HelloSender
  13. * @date 2019/4/23 when
  14. * /
  15. @Component
  16. @Slf4j
  17. public class DirectSender {
  18. @Autowired
  19. private RabbitTemplate rabbitTemplate;
  20. public void send() {
  21. User user = new User();
  22. ` user. Elegantly-named setName (" green "); `Copy the code
  23. `user.setPass("111111"); `Copy the code
  24. // Send a message to the Hello queue
  25. Log.info ("DirectReceiver sends messages: {}", user); `Copy the code
  26. `rabbitTemplate.convertAndSend(DirectConfig.EXCHANGE,` `DirectConfig.ROUTING_KEY, user,` `new` `CorrelationData(UUID.randomUUID().toString())); `Copy the code
  27. String msg = "hello qing";
  28. 'log.info("DirectReceiver sends messages: {}", MSG); `Copy the code
  29. `rabbitTemplate.convertAndSend(DirectConfig.EXCHANGE,` `DirectConfig.ROUTING_KEY, msg); `Copy the code
  30. }
  31. }
  • consumers
  1. / * *
  2. *
  3. * @author lijianqing
  4. * @version 1.0
  5. * @ClassName HelloReceiver
  6. * @date 2019/4/23 11:42
  7. * /
  8. @Component
  9. @Slf4j
  10. @RabbitListener(queues = "direct_name")
  11. public class DirectReceiver {
  12. @RabbitHandler
  13. public void processUser(User user, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  14. 'log.info("DirectReceiver receives message: {}", user); `Copy the code
  15. / / ACK manually
  16. try {
  17. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  18. `channel.basicAck(tag,` `false); `Copy the code
  19. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  20. // channel.basicNack(deliveryTag, false, false);
  21. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  22. // channel.basicReject(deliveryTag,false);
  23. } catch (IOException e) {
  24. `e.printStackTrace(); `Copy the code
  25. }
  26. }
  27. @RabbitHandler
  28. public void processString(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  29. Log.info (" Received message: {}", message); `Copy the code
  30. / / ACK manually
  31. try {
  32. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  33. `channel.basicAck(tag,` `false); `Copy the code
  34. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  35. // channel.basicNack(deliveryTag, false, false);
  36. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  37. // channel.basicReject(deliveryTag,false);
  38. } catch (IOException e) {
  39. `e.printStackTrace(); `Copy the code
  40. }
  41. }
  42. }

Third, Topic Exchange mode

Create queues and switches. The queue is bound to the switch by routing matching rules

  1. import org.springframework.amqp.core.Binding;
  2. import org.springframework.amqp.core.BindingBuilder;
  3. import org.springframework.amqp.core.Queue;
  4. import org.springframework.amqp.core.TopicExchange;
  5. import org.springframework.beans.factory.annotation.Qualifier;
  6. import org.springframework.context.annotation.Bean;
  7. import org.springframework.context.annotation.Configuration;
  8. / * *
  9. * queueMessages match topic.#, queueMessage matches only "topic.message".
  10. *
  11. * @author lijianqing
  12. * @version 1.0
  13. * @ClassName TopicRabbitConfig
  14. * @date 2019/4/23 15:03
  15. * /
  16. @Configuration
  17. public class TopicRabbitConfig {
  18. final static String message = "topic.message";
  19. final static String messages = "topic.messages";
  20. @Bean
  21. public Queue queueMessage() {
  22. return new Queue(TopicRabbitConfig.message);
  23. }
  24. @Bean
  25. public Queue queueMessages() {
  26. return new Queue(TopicRabbitConfig.messages);
  27. }
  28. @Bean
  29. TopicExchange exchange() {
  30. return new TopicExchange("topicExchange");
  31. }
  32. @Bean
  33. Binding bindingExchangeMessage(@Qualifier("queueMessage") Queue queueMessage, TopicExchange exchange) {
  34. return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");
  35. }
  36. @Bean
  37. Binding bindingExchangeMessages(@Qualifier("queueMessages") Queue queueMessages, TopicExchange exchange) {
  38. return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");
  39. }
  40. }
  • Creating a producer
  1. import lombok.extern.slf4j.Slf4j;
  2. import org.springframework.amqp.rabbit.core.RabbitTemplate;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.stereotype.Component;
  5. / * *
  6. * @author lijianqing
  7. * @version 1.0
  8. * @ClassName TopicSender
  9. * @date 2019/4/23 he
  10. * /
  11. @Component
  12. @Slf4j
  13. public class TopicSender {
  14. @Autowired
  15. private RabbitTemplate rabbitTemplate;
  16. / * *
  17. * Match topic.message, both queues will receive it
  18. * /
  19. public void send1() {
  20. String context = "hi, i am message 1";
  21. Log.info (" Subject send: {}", context); `Copy the code
  22. `rabbitTemplate.convertAndSend("topicExchange",` `"topic.message", context); `Copy the code
  23. }
  24. / * *
  25. * matching topic. The messages
  26. * /
  27. public void send2() {
  28. String context = "hi, i am messages 2";
  29. Log.info (" Subject send: {}", context); `Copy the code
  30. `rabbitTemplate.convertAndSend("topicExchange",` `"topic.messages", context); `Copy the code
  31. }
  32. }
  • Create consumers. Here we create two separate queues of consumers
  1. @Component
  2. @RabbitListener(queues = "topic.message")
  3. @Slf4j
  4. public class TopicReceiver {
  5. @RabbitHandler
  6. public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  7. `log.info("topic.message Receiver1 {}: ", message); `Copy the code
  8. / / ACK manually
  9. try {
  10. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  11. `channel.basicAck(tag,` `false); `Copy the code
  12. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  13. // channel.basicNack(deliveryTag, false, false);
  14. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  15. // channel.basicReject(deliveryTag,false);
  16. } catch (IOException e) {
  17. `e.printStackTrace(); `Copy the code
  18. }
  19. }
  20. }

Second consumer

  1. @Component
  2. @RabbitListener(queues = "topic.messages")
  3. @Slf4j
  4. public class TopicReceiver2 {
  5. @RabbitHandler
  6. public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  7. `log.info("topic.messages Receiver2 : {}", message); `Copy the code
  8. / / ACK manually
  9. try {
  10. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  11. `channel.basicAck(tag,` `false); `Copy the code
  12. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  13. // channel.basicNack(deliveryTag, false, false);
  14. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  15. // channel.basicReject(deliveryTag,false);
  16. } catch (IOException e) {
  17. `e.printStackTrace(); `Copy the code
  18. }
  19. }
  20. }

4. Fanout mode

Publish, subscribe. All queues bound to the switch receive messages, and any characters of the routing key specified by the sender are ignored

Configure switches and queues

  1. @Configuration
  2. public class FanoutRabbitConfig {
  3. @Bean
  4. public Queue AMessage() {
  5. return new Queue("fanout.A");
  6. }
  7. @Bean
  8. public Queue BMessage() {
  9. return new Queue("fanout.B");
  10. }
  11. @Bean
  12. public Queue CMessage() {
  13. return new Queue("fanout.C");
  14. }
  15. @Bean
  16. FanoutExchange fanoutExchange() {
  17. return new FanoutExchange("fanoutExchange");
  18. }
  19. @Bean
  20. Binding bindingExchangeA(Queue AMessage, FanoutExchange fanoutExchange) {
  21. return BindingBuilder.bind(AMessage).to(fanoutExchange);
  22. }
  23. @Bean
  24. Binding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {
  25. return BindingBuilder.bind(BMessage).to(fanoutExchange);
  26. }
  27. @Bean
  28. Binding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {
  29. return BindingBuilder.bind(CMessage).to(fanoutExchange);
  30. }
  31. }
  • Creating a sender
  1. @Component
  2. @Slf4j
  3. public class FanoutSender {
  4. @Autowired
  5. private RabbitTemplate rabbitTemplate;
  6. public void send() {
  7. String context = "hi, fanout msg ";
  8. `rabbitTemplate.convertAndSend("fanoutExchange",` `null, context); `Copy the code
  9. }
  10. }
  • Create queue A, B, C consumers
  1. @Component
  2. @RabbitListener(queues = "fanout.A")
  3. @Slf4j
  4. public class FanoutReceiverA {
  5. @RabbitHandler
  6. public void process(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) {
  7. `log.info("fanout Receiver A : {}"` `, message); `Copy the code
  8. / / ACK manually
  9. try {
  10. // // message acknowledgement, which means that consumers acknowledge receipt of the current message and semantically indicate that consumers have successfully processed the current message.
  11. `channel.basicAck(tag,` `false); `Copy the code
  12. The second parameter indicates whether to reject multiple messages at a time. The third parameter indicates whether to rejoin the current message
  13. // channel.basicNack(deliveryTag, false, false);
  14. // Indicates that the consumer rejects the current message. The second parameter indicates whether the current message is re-enqueued
  15. // channel.basicReject(deliveryTag,false);
  16. } catch (IOException e) {
  17. `e.printStackTrace(); `Copy the code
  18. }
  19. }
  20. }

The rest of B and C will not be repeated.

Unit testing

  1. import org.junit.Test;
  2. import org.junit.runner.RunWith;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.boot.test.context.SpringBootTest;
  5. import org.springframework.test.context.junit4.SpringRunner;
  6. import zero.springboot.study.rabbitmq.direct.DirectSender;
  7. import zero.springboot.study.rabbitmq.fanout.FanoutSender;
  8. import zero.springboot.study.rabbitmq.simple.HelloSender;
  9. import zero.springboot.study.rabbitmq.topic.TopicSender;
  10. @RunWith(SpringRunner.class)
  11. @SpringBootTest(classes = RabbitmqApplication.class)
  12. public class RabbitmqApplicationTests {
  13. @Autowired
  14. private DirectSender directSender;
  15. @Autowired
  16. private TopicSender topicSender;
  17. @Autowired
  18. private FanoutSender fanoutSender;
  19. @Autowired
  20. private HelloSender helloSender;
  21. @Test
  22. public void testDirect() {
  23. `directSender.send(); `Copy the code
  24. }
  25. @Test
  26. public void topic1() {
  27. `topicSender.send1(); `Copy the code
  28. }
  29. @Test
  30. public void topic2() {
  31. `topicSender.send2(); `Copy the code
  32. }
  33. @Test
  34. public void testFanout() {
  35. `fanoutSender.send(); `Copy the code
  36. }
  37. @Test
  38. public void testSimple() {
  39. `helloSender.send(); `Copy the code
  40. }
  41. }