Maven dependency add
com.rabbitmq
amqp-client
3.0.4
Java examples of seven working modes
1. Simple mode
In the simplest one consumer and one producer pattern, the producer generates the message, the consumer listens for the message, and if the consumer listens for the message it needs, it will consume the message. This kind of message is secondary, which is consumed and then lost.
1.1.1, EasyRecv. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class EasyRecv {
// Queue name
private final static String QUEUE_NAME =”hello world”;
public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException {
// Open a connection and create a channel, as on the sender
ConnectionFactory factory = new ConnectionFactory();
// Set the IP address or host name of the RabbitMQ host
Factory. SetHost (” 127.0.0.1 “);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare a queue, mainly to prevent message receivers from running the program first and creating queues that do not already exist.
/ * *
-
Queue name
-
Persistent or not
-
Exclusive or not exclusive means that only the channel is allowed to access the queue. Generally true is used when only one consumer can consume on a queue
-
Whether to automatically delete delete after consuming
-
Other attributes
* /
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
System.out.println(“Waiting for messages. To exit press CTRL+C”);
// Create a queue consumer
QueueingConsumer consumer = new QueueingConsumer(channel);
// Specify the consumption queue
/ * *
-
Queue name
-
Other Attribute Routing
-
The message body
* /
channel.basicConsume(QUEUE_NAME,true, consumer);
while(true)
{
NextDelivery is a blocking method (the internal implementation actually blocks the queue take method)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“Received ‘”+ message +”‘”);
}
}
}
1.1.2, EasySend. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class EasySend {
// Queue name
private final static String QUEUE_NAME =”hello world”;
public static void main(String[] argv) throws java.io.IOException
{
/ * *
- Create a connection to MabbitMQ
* /
ConnectionFactory factory = new ConnectionFactory();
// Set the IP address or host name of the MabbitMQ host
Factory. SetHost (” 127.0.0.1 “);
while(true){
// Create a connection
Connection connection = factory.newConnection();
// Create a channel
Channel channel = connection.createChannel();
// Specify a queue
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
// The message sent
Scanner scanner = new Scanner(System.in);
String ms = scanner.nextLine();
//String message =”hello world!” ;
// Send a message to the queue
channel.basicPublish(“”, QUEUE_NAME, null, ms.getBytes());
System.out.println(“Sent ‘”+ ms +”‘”);
// Close channels and connections
channel.close();
connection.close();
}
}
The above two can already communicate. Here is a simple example again, but we can see that at the code level, the connection code is the same, so we can create a connection utility class.
1.2.1, RabbitmqConnectionUtil. Java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class RabbitmqConnectionUtil {
public static Connection getConnection() throws IOException {
// Connect factory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(“localhost”);
// Connect port 5672 notice 15672 is the tool interface port 25672 is the cluster port
factory.setPort(5672);
//factory.setVirtualHost(“/xxxxx”);
// factory.setUsername(“xxxxxx”);
// factory.setPassword(“123456”);
// Get the connection
Connection connection = factory.newConnection();
returnconnection;
}
}
1.2.2, UtilSend. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class UtilSend {
private final static String QUEUE_NAME =”UtilConn”;
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection(); // Create channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
// Message content
String message =” this is LBW square “;
channel.basicPublish(“”, QUEUE_NAME,null,message.getBytes());
System.out.println(“[x]Sent ‘”+message +”‘”);
// Finally close the pass and connection
channel.close();
connection.close();
}
}
1.2.3, UtilRecv. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class UtilRecv {
private final static String QUEUE_NAME =”UtilConn”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = null;
connection = RabbitmqConnectionUtil.getConnection(); // Create channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
while(true){
// This method blocks
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received ‘”+message+”‘”);
}
}
}
2. Work queues
The work queue is an enhanced version of the simple mode. A queue can have multiple producers or consumers competing for consuming messages, but we still need to ensure the idempotent nature of the queue. If the queue exists, it cannot create a queue with the same name.
Each of the following processes controls its main thread sleep to give us a better view of the results.
2.1.1, Sender1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender1 {
private final static String QUEUE_NAME =”queue_work”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
for(int i = 0; i < 100; i++){
String message =”lbw”+ i;
channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());
System.out.println(“[x] Sent ‘”+message +”‘”);
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
2.1.2, Sender2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender2 {
private final static String QUEUE_NAME =”queue_work”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
for(int i = 0; i < 100; i++){
String message =”nb”+ i;
channel.basicPublish(“”, QUEUE_NAME, null, message.getBytes());
System.out.println(“[x] Sent ‘”+message +”‘”);
Thread.sleep(i*10);
}
channel.close();
connection.close();
}
}
2.1.3, Receiver1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/ * *
- Created by san
* /
public class Receiver1 {
private final static String QUEUE_NAME =”queue_work”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// The server sends only one message to the consumer at a time
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// About manual confirmation, I have time to study it later
channel.basicConsume(QUEUE_NAME,false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received1 ‘”+message+”‘”);
Thread.sleep(10);
// Return to confirm status
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
2.1.4, Receiver2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/ * *
- Created by san
* /
public class Receiver2 {
private final static String QUEUE_NAME =”queue_work”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME,false,false,false,null);
// The server sends only one message to the consumer at a time
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received2 ‘”+message+”‘”);
Thread.sleep(1000);
// Return to confirm status
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
2.1.5, results,
The above four programs are all run, and the results can be seen as follows. According to the analysis of the results, it can be known that the same message queue can have multiple producers and consumers.
3. Publish/Subscribe (Fanout)
3.1.1, Sender. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
public class Sender {
private final static String EXCHANGE_NAME =”test_exchange_fanout”;
public static void main(String[] args)
{
try
{// get the connection
Connection connection = RabbitmqConnectionUtil.getConnection();
// Get a channel from the connection
Channel channel = connection.createChannel();
// Declare switches (distribution: publish/subscribe mode)
channel.exchangeDeclare(EXCHANGE_NAME,”fanout”);
// Send a message
for(int i = 0; i < 5; i++)
{
String message =” rubenweil square “+ I;
System. The out. Println (” (send) : “+ message);
// Send a message
channel.basicPublish(EXCHANGE_NAME,””, null, message.getBytes(“utf-8”));
Thread.sleep(5 * i);
}
channel.close();
connection.close();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
3.1.2, Receiver1. Java
import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver1 {
// Switch name
private final static String EXCHANGE_NAME =”test_exchange_fanout”;
// Queue name
private static final String QUEUE_NAME =”test_queue_email”;
public static void main(String[] args)
{
try
{
// Get the connection
Connection connection = RabbitmqConnectionUtil.getConnection();
// Get a channel from the connection
final Channel channel = connection.createChannel();
// Declare switches (distribution: publish/subscribe mode)
channel.exchangeDeclare(EXCHANGE_NAME,”fanout”);
// Declare a queue
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
// Bind queues to switches
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,””);
Ensure that only one is distributed at a time
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// Define the consumer
DefaultConsumer consumer = new DefaultConsumer(channel)
{
// Execute the callback method when the message arrives
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body,”utf-8″);
System.out.println(“[email] Receive message: “+ message);
try
{
// Consumer rest 2s deals with business
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println(“[1] done”);
// Reply manually
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// Set manual answer
boolean autoAck =false;
// Listen to the queue
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
3.1.3, Receiver2. Java
import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver2 {
// Switch name
private final static String EXCHANGE_NAME =”test_exchange_fanout”;
// Queue name
private static final String QUEUE_NAME =”test_queue_phone”;
public static void main(String[] args)
{
try
{
// Get the connection
Connection connection = RabbitmqConnectionUtil.getConnection();
// Get a channel from the connection
final Channel channel = connection.createChannel();
// Declare switches (distribution: publish/subscribe mode)
channel.exchangeDeclare(EXCHANGE_NAME,”fanout”);
// Declare a queue
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
// Bind queues to switches
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,””);
Ensure that only one is distributed at a time
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// Define the consumer
DefaultConsumer consumer = new DefaultConsumer(channel)
{
// Execute the callback method when the message arrives
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body,”utf-8″);
System.out.println(“[phone] Receive message: “+ message);
try
{
// The consumer rests for 1s to process the business
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println(“[2] done”);
// Reply manually
channel.basicAck(envelope.getDeliveryTag(),false);
}
}
};
// Set manual answer
boolean autoAck =false;
// Listen to the queue
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
3.1.4, results,
See from application results and the RabbitMq background, belong to the news broadcast model, two different name to receive the message queue, only they will be bound to the same switch, and the message is persistent, as long as the switch is still there, what time online consumers can consume it bound by the switch, And it’s only going to be one consumer and it’s only going to be one consumer.
4. Routing (Direct)
1. In the previous example, we were already creating bindings. You might recall code like this:
Channel. queueBind (queueName, EXCHANGE_NAME, “”);
Bindings are relationships between exchanges and queues. This can be simply interpreted as: the queue is interested in the messages from this exchange.
2. Bindings can take additional routingKey arguments. To avoid confusion with the basic_publish parameter, we call it a binding key. Here’s how we can create bindings with keys:
Channel. queueBind (queueName, EXCHANGE_NAME, “black”);
Direct binding (key directly bound to a single queue)
Multiple bindings (the same binding key binds multiple queues)
With different keys bound to different queues, different log levels can be sent to different queues.
4.4.1, Sender
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender {
private final static String EXCHANGE_NAME =”exchange_direct”;
private final static String EXCHANGE_TYPE =”direct”;
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
String message =” That must be blue “;
channel.basicPublish(EXCHANGE_NAME,”key2″, null, message.getBytes());
System.out.println(“[x] Sent ‘”+message+”‘”);
channel.close();
connection.close();
}
}
4.1.2, Receiver1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/ * *
- Created by san
* /
public class Receiver1 {
private final static String QUEUE_NAME =”queue_routing”;
private final static String EXCHANGE_NAME =”exchange_direct”;
public static void main(String[] args) throws IOException, InterruptedException {
// Get the connection and mq channel
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,”key”);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,”key2″);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received1 “+message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
4.1.3, Receiver2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/ * *
- Created by san
* /
public class Receiver2 {
private final static String QUEUE_NAME =”queue_routing2″;
private final static String EXCHANGE_NAME =”exchange_direct”;
public static void main(String[] args) throws IOException, InterruptedException {
// Get the connection and mq channel
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,”key2″);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received2 “+message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
4.1.4 Results-Summary
Note that under Direct, the Exchange must already exist in order for the consuming queue to bind to Exchange, otherwise an error will be reported. The first time we run the program, we need to start the Sender before we can successfully start the Reciver.
5. Topic
The topic is also a persistent message, and as long as the switch is in place, each online consumer can consume a topic of interest once.
* (asterisk) can replace a word.
# (hash) can replace zero or more words.
5.1.1, Sender. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender {
private final static String EXCHANGE_NAME =”exchange_topic”;
private final static String EXCHANGE_TYPE =”topic”;
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE); // Message content
String message =” this is Lubenweil Square “;
// The second argument is the topic match value
channel.basicPublish(EXCHANGE_NAME,”lbw.nb”,null,message.getBytes());
System.out.println(“[x] Sent ‘”+message+”‘”);
// Close the connection
channel.close();
connection.close();
}
}
5.1.2, Receiver1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver1 {
private final static String QUEUE_NAME =”queue_topic”;
private final static String EXCHANGE_NAME =”exchange_topic”;
private final static String EXCHANGE_TYPE =”topic”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
// The second parameter is to match the topic I am interested in
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,”lbw.nb.*”);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received1 ‘”+message +”‘”);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
5.1.3, Receiver2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver2 {
private final static String QUEUE_NAME =”queue_topic2″;
private final static String EXCHANGE_NAME =”exchange_topic”;
private final static String EXCHANGE_TYPE =”topic”;
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME,false,false,false, null);
// The second parameter is to match the topic I am interested in
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME,”lbw.#”);
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println(“[x] Received2 ‘”+message +”‘”);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
}
}
5.1.4 Results-analysis
The special feature of a topic is that queues can get messages about the topic they are interested in, and can match all strings of interest with the wildcard * or # character.
6. RPC (Remote Procedure Call)
To give you an idea of the process of implementing a call-back between two switches.
3. Switch for RabbitMq
RabbitMq has the concept of a switch. Messages are sent by the Client and forwarded to the corresponding queue by the switch. The Worker retrieves unread data processing from the queue. This enables the sender of the message to be unaware of the message consumer and vice versa. Direct Exchange: directly connects to the switch and forwards messages to the queue specified by routigKey
Fanout exchange: a fan exchange that forwards messages to all bound queues (equivalent to broadcasting)
Topic exchange: Topic exchange, which forwards messages according to rules (flexible)
Headers Exchange: the first switch
BasicPublish (“”, QUEUE_NAME, null, message.getBytes()); It is the first argument to this method, which is empty to indicate that the default switch is used. Several exchange types are available: Direct, Topic, HEADERS, and FANout.