Maven dependency add

com.rabbitmq

amqp-client

3.0.4

Java examples of seven working modes

1. Simple mode

In the simplest one consumer and one producer pattern, the producer generates the message, the consumer listens for the message, and if the consumer listens for the message it needs, it will consume the message. This kind of message is secondary, which is consumed and then lost.

! [](https://upload-images.jianshu.io/upload_images/24533109-82f171c0c41f29a3? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

1.1.1, EasyRecv. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;

public class EasyRecv {

// Queue name

private final static String QUEUE_NAME =”hello world”;

public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException {

// Open a connection and create a channel, as on the sender

ConnectionFactory factory = new ConnectionFactory();

// Set the IP address or host name of the RabbitMQ host

Factory. SetHost (” 127.0.0.1 “);

Connection connection = factory.newConnection();

Channel channel = connection.createChannel();

// Declare a queue, mainly to prevent message receivers from running the program first and creating queues that do not already exist.

/ * *

  • Queue name

  • Persistent or not

  • Exclusive or not exclusive means that only the channel is allowed to access the queue. Generally true is used when only one consumer can consume on a queue

  • Whether to automatically delete delete after consuming

  • Other attributes

* /

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

System.out.println(“Waiting for messages. To exit press CTRL+C”);

// Create a queue consumer

QueueingConsumer consumer = new QueueingConsumer(channel);

// Specify the consumption queue

/ * *

  • Queue name

  • Other Attribute Routing

  • The message body

* /

channel.basicConsume(QUEUE_NAME,true, consumer);

while(true)

{

NextDelivery is a blocking method (the internal implementation actually blocks the queue take method)

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“Received ‘”+ message +”‘”);

}

}

}

1.1.2, EasySend. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.util.Scanner;

public class EasySend {

// Queue name

private final static String QUEUE_NAME =”hello world”;

public static void main(String[] argv) throws java.io.IOException

{

/ * *

  • Create a connection to MabbitMQ

* /

ConnectionFactory factory = new ConnectionFactory();

// Set the IP address or host name of the MabbitMQ host

Factory. SetHost (” 127.0.0.1 “);

while(true){

// Create a connection

Connection connection = factory.newConnection();

// Create a channel

Channel channel = connection.createChannel();

// Specify a queue

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

// The message sent

Scanner scanner = new Scanner(System.in);

String ms = scanner.nextLine();

//String message =”hello world!” ;

// Send a message to the queue

channel.basicPublish(“”, QUEUE_NAME, null, ms.getBytes());

System.out.println(“Sent ‘”+ ms +”‘”);

// Close channels and connections

channel.close();

connection.close();

}

}

The above two can already communicate. Here is a simple example again, but we can see that at the code level, the connection code is the same, so we can create a connection utility class.

1.2.1, RabbitmqConnectionUtil. Java

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;

public class RabbitmqConnectionUtil {

public static Connection getConnection() throws IOException {

// Connect factory

ConnectionFactory factory = new ConnectionFactory();

factory.setHost(“localhost”);

// Connect port 5672 notice 15672 is the tool interface port 25672 is the cluster port

factory.setPort(5672);

//factory.setVirtualHost(“/xxxxx”);

// factory.setUsername(“xxxxxx”);

// factory.setPassword(“123456”);

// Get the connection

Connection connection = factory.newConnection();

returnconnection;

}

}

1.2.2, UtilSend. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class UtilSend {

private final static String QUEUE_NAME =”UtilConn”;

public static void main(String[] args) throws IOException {

Connection connection = RabbitmqConnectionUtil.getConnection(); // Create channel

Channel channel = connection.createChannel();

// Declare a queue

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

// Message content

String message =” this is LBW square “;

channel.basicPublish(“”, QUEUE_NAME,null,message.getBytes());

System.out.println(“[x]Sent ‘”+message +”‘”);

// Finally close the pass and connection

channel.close();

connection.close();

}

}

1.2.3, UtilRecv. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class UtilRecv {

private final static String QUEUE_NAME =”UtilConn”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = null;

connection = RabbitmqConnectionUtil.getConnection(); // Create channel

Channel channel = connection.createChannel();

// Declare a queue

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

QueueingConsumer queueingConsumer = new QueueingConsumer(channel);

channel.basicConsume(QUEUE_NAME,true,queueingConsumer);

while(true){

// This method blocks

QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received ‘”+message+”‘”);

}

}

}

2. Work queues

The work queue is an enhanced version of the simple mode. A queue can have multiple producers or consumers competing for consuming messages, but we still need to ensure the idempotent nature of the queue. If the queue exists, it cannot create a queue with the same name.

Each of the following processes controls its main thread sleep to give us a better view of the results.

! [](https://upload-images.jianshu.io/upload_images/24533109-c15da48343e02d8d? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

2.1.1, Sender1. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender1 {

private final static String QUEUE_NAME =”queue_work”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

for(int i = 0; i < 100; i++){

String message =”lbw”+ i;

channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());

System.out.println(“[x] Sent ‘”+message +”‘”);

Thread.sleep(i*10);

}

channel.close();

connection.close();

}

}

2.1.2, Sender2. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender2 {

private final static String QUEUE_NAME =”queue_work”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

for(int i = 0; i < 100; i++){

String message =”nb”+ i;

channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());

System.out.println(“[x] Sent ‘”+message +”‘”);

Thread.sleep(i*10);

}

channel.close();

connection.close();

}

}

2.1.3, Receiver1. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/ * *

  • Created by san

* /

public class Receiver1 {

private final static String QUEUE_NAME =”queue_work”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// The server sends only one message to the consumer at a time

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

// About manual confirmation, I have time to study it later

channel.basicConsume(QUEUE_NAME,false, consumer);

while(true){

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received1 ‘”+message+”‘”);

Thread.sleep(10);

// Return to confirm status

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

2.1.4, Receiver2. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/ * *

  • Created by san

* /

public class Receiver2 {

private final static String QUEUE_NAME =”queue_work”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null);

// The server sends only one message to the consumer at a time

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(QUEUE_NAME,false, consumer);

while(true){

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received2 ‘”+message+”‘”);

Thread.sleep(1000);

// Return to confirm status

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

2.1.5, results,

The above four programs are all run, and the results can be seen as follows. According to the analysis of the results, it can be known that the same message queue can have multiple producers and consumers.

! [](https://upload-images.jianshu.io/upload_images/24533109-114bcd7123b7ec23? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-8d5e28c4275c2d72? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-d9f0a574b124d1c4? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-45cd8f98d3535462? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

3. Publish/Subscribe (Fanout)

! [](https://upload-images.jianshu.io/upload_images/24533109-6190e99e5bf48955? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

3.1.1, Sender. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

public class Sender {

private final static String EXCHANGE_NAME =”test_exchange_fanout”;

public static void main(String[] args)

{

try

{// get the connection

Connection connection = RabbitmqConnectionUtil.getConnection();

// Get a channel from the connection

Channel channel = connection.createChannel();

// Declare switches (distribution: publish/subscribe mode)

channel.exchangeDeclare(EXCHANGE_NAME,”fanout”);

// Send a message

for(int i = 0; i < 5; i++)

{

String message =” rubenweil square “+ I;

System. The out. Println (” (send) : “+ message);

// Send a message

channel.basicPublish(EXCHANGE_NAME,””, null, message.getBytes(“utf-8”));

Thread.sleep(5 * i);

}

channel.close();

connection.close();

}

catch (Exception e)

{

e.printStackTrace();

}

}

}

3.1.2, Receiver1. Java

import com.rabbitmq.client.*;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver1 {

// Switch name

private final static String EXCHANGE_NAME =”test_exchange_fanout”;

// Queue name

private static final String QUEUE_NAME =”test_queue_email”;

public static void main(String[] args)

{

try

{

// Get the connection

Connection connection = RabbitmqConnectionUtil.getConnection();

// Get a channel from the connection

final Channel channel = connection.createChannel();

// Declare switches (distribution: publish/subscribe mode)

channel.exchangeDeclare(EXCHANGE_NAME,”fanout”);

// Declare a queue

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

// Bind queues to switches

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,””);

Ensure that only one is distributed at a time

int prefetchCount = 1;

channel.basicQos(prefetchCount);

// Define the consumer

DefaultConsumer consumer = new DefaultConsumer(channel)

{

// Execute the callback method when the message arrives

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

byte[] body) throws IOException

{

String message = new String(body,”utf-8″);

System.out.println(“[email] Receive message: “+ message);

try

{

// Consumer rest 2s deals with business

Thread.sleep(1000);

}

catch (InterruptedException e)

{

e.printStackTrace();

}

finally

{

System.out.println(“[1] done”);

// Reply manually

channel.basicAck(envelope.getDeliveryTag(),false);

}

}

};

// Set manual answer

boolean autoAck =false;

// Listen to the queue

channel.basicConsume(QUEUE_NAME, autoAck, consumer);

}

catch (IOException e)

{

e.printStackTrace();

}

}

}

3.1.3, Receiver2. Java

import com.rabbitmq.client.*;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver2 {

// Switch name

private final static String EXCHANGE_NAME =”test_exchange_fanout”;

// Queue name

private static final String QUEUE_NAME =”test_queue_phone”;

public static void main(String[] args)

{

try

{

// Get the connection

Connection connection = RabbitmqConnectionUtil.getConnection();

// Get a channel from the connection

final Channel channel = connection.createChannel();

// Declare switches (distribution: publish/subscribe mode)

channel.exchangeDeclare(EXCHANGE_NAME,”fanout”);

// Declare a queue

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

// Bind queues to switches

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,””);

Ensure that only one is distributed at a time

int prefetchCount = 1;

channel.basicQos(prefetchCount);

// Define the consumer

DefaultConsumer consumer = new DefaultConsumer(channel)

{

// Execute the callback method when the message arrives

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,

byte[] body) throws IOException

{

String message = new String(body,”utf-8″);

System.out.println(“[phone] Receive message: “+ message);

try

{

// The consumer rests for 1s to process the business

Thread.sleep(1000);

}

catch (InterruptedException e)

{

e.printStackTrace();

}

finally

{

System.out.println(“[2] done”);

// Reply manually

channel.basicAck(envelope.getDeliveryTag(),false);

}

}

};

// Set manual answer

boolean autoAck =false;

// Listen to the queue

channel.basicConsume(QUEUE_NAME, autoAck, consumer);

}

catch (IOException e)

{

e.printStackTrace();

}

}

}

3.1.4, results,

See from application results and the RabbitMq background, belong to the news broadcast model, two different name to receive the message queue, only they will be bound to the same switch, and the message is persistent, as long as the switch is still there, what time online consumers can consume it bound by the switch, And it’s only going to be one consumer and it’s only going to be one consumer.

! [](https://upload-images.jianshu.io/upload_images/24533109-ad5f40c062695e39? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-5da0ba2656af3c29? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-7903d1e52f0117af? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-ead33df647f207a8? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-bf5c738ab2118f22? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

4. Routing (Direct)

1. In the previous example, we were already creating bindings. You might recall code like this:

Channel. queueBind (queueName, EXCHANGE_NAME, “”);

Bindings are relationships between exchanges and queues. This can be simply interpreted as: the queue is interested in the messages from this exchange.

2. Bindings can take additional routingKey arguments. To avoid confusion with the basic_publish parameter, we call it a binding key. Here’s how we can create bindings with keys:

Channel. queueBind (queueName, EXCHANGE_NAME, “black”);

Direct binding (key directly bound to a single queue)

! [](https://upload-images.jianshu.io/upload_images/24533109-f4fdc243e9e76079? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

Multiple bindings (the same binding key binds multiple queues)

! [](https://upload-images.jianshu.io/upload_images/24533109-45dd86a17802995d? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

With different keys bound to different queues, different log levels can be sent to different queues.

! [](https://upload-images.jianshu.io/upload_images/24533109-33bff08749c06b20? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

4.4.1, Sender

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender {

private final static String EXCHANGE_NAME =”exchange_direct”;

private final static String EXCHANGE_TYPE =”direct”;

public static void main(String[] args) throws IOException {

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);

String message =” That must be blue “;

channel.basicPublish(EXCHANGE_NAME,”key2″, null, message.getBytes());

System.out.println(“[x] Sent ‘”+message+”‘”);

channel.close();

connection.close();

}

}

4.1.2, Receiver1. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/ * *

  • Created by san

* /

public class Receiver1 {

private final static String QUEUE_NAME =”queue_routing”;

private final static String EXCHANGE_NAME =”exchange_direct”;

public static void main(String[] args) throws IOException, InterruptedException {

// Get the connection and mq channel

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,”key”);

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,”key2″);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(QUEUE_NAME,false, consumer);

while(true){

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received1 “+message);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

4.1.3, Receiver2. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

/ * *

  • Created by san

* /

public class Receiver2 {

private final static String QUEUE_NAME =”queue_routing2″;

private final static String EXCHANGE_NAME =”exchange_direct”;

public static void main(String[] args) throws IOException, InterruptedException {

// Get the connection and mq channel

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false,null);

channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,”key2″);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(QUEUE_NAME,false, consumer);

while(true){

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received2 “+message);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

4.1.4 Results-Summary

Note that under Direct, the Exchange must already exist in order for the consuming queue to bind to Exchange, otherwise an error will be reported. The first time we run the program, we need to start the Sender before we can successfully start the Reciver.

! [](https://upload-images.jianshu.io/upload_images/24533109-e89d1f04de7e70ed? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-4cc04870a3bdaf9c? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-83cf2ad0074d02c7? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-62935237f8188a89? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-d1248beff0d4ee47? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

5. Topic

The topic is also a persistent message, and as long as the switch is in place, each online consumer can consume a topic of interest once.

* (asterisk) can replace a word.

# (hash) can replace zero or more words.

! [](https://upload-images.jianshu.io/upload_images/24533109-f23b2977594dd8b1? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

5.1.1, Sender. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Sender {

private final static String EXCHANGE_NAME =”exchange_topic”;

private final static String EXCHANGE_TYPE =”topic”;

public static void main(String[] args) throws IOException {

Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); // Message content

String message =” this is Lubenweil Square “;

// The second argument is the topic match value

channel.basicPublish(EXCHANGE_NAME,”lbw.nb”,null,message.getBytes());

System.out.println(“[x] Sent ‘”+message+”‘”);

// Close the connection

channel.close();

connection.close();

}

}

5.1.2, Receiver1. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver1 {

private final static String QUEUE_NAME =”queue_topic”;

private final static String EXCHANGE_NAME =”exchange_topic”;

private final static String EXCHANGE_TYPE =”topic”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

// The second parameter is to match the topic I am interested in

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,”lbw.nb.*”);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(QUEUE_NAME,false, consumer);

while(true){

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received1 ‘”+message +”‘”);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

5.1.3, Receiver2. Java

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.QueueingConsumer;

import top.san.RabbitMq.util.RabbitmqConnectionUtil;

import java.io.IOException;

public class Receiver2 {

private final static String QUEUE_NAME =”queue_topic2″;

private final static String EXCHANGE_NAME =”exchange_topic”;

private final static String EXCHANGE_TYPE =”topic”;

public static void main(String[] args) throws IOException, InterruptedException {

Connection connection = RabbitmqConnectionUtil.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME,false,false,false, null);

// The second parameter is to match the topic I am interested in

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,”lbw.#”);

channel.basicQos(1);

QueueingConsumer consumer = new QueueingConsumer(channel);

channel.basicConsume(QUEUE_NAME,false, consumer);

while(true){

QueueingConsumer.Delivery delivery = consumer.nextDelivery();

String message = new String(delivery.getBody());

System.out.println(“[x] Received2 ‘”+message +”‘”);

Thread.sleep(10);

channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);

}

}

}

5.1.4 Results-analysis

The special feature of a topic is that queues can get messages about the topic they are interested in, and can match all strings of interest with the wildcard * or # character.

! [](https://upload-images.jianshu.io/upload_images/24533109-bfef0558b4a7e694? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-c58d7e5aa95ff638? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-f9670e7399c20bf6? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-558b012b39094dad? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
! [](https://upload-images.jianshu.io/upload_images/24533109-e320a509975c76b7? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

6. RPC (Remote Procedure Call)

To give you an idea of the process of implementing a call-back between two switches.

! [](https://upload-images.jianshu.io/upload_images/24533109-ee4802155e1249e1? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)

3. Switch for RabbitMq

RabbitMq has the concept of a switch. Messages are sent by the Client and forwarded to the corresponding queue by the switch. The Worker retrieves unread data processing from the queue. This enables the sender of the message to be unaware of the message consumer and vice versa. Direct Exchange: directly connects to the switch and forwards messages to the queue specified by routigKey

Fanout exchange: a fan exchange that forwards messages to all bound queues (equivalent to broadcasting)

Topic exchange: Topic exchange, which forwards messages according to rules (flexible)

Headers Exchange: the first switch

BasicPublish (“”, QUEUE_NAME, null, message.getBytes()); It is the first argument to this method, which is empty to indicate that the default switch is used. Several exchange types are available: Direct, Topic, HEADERS, and FANout.

! [](https://upload-images.jianshu.io/upload_images/24533109-34f0177b0754224e? imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)