Maven dependency add
<! -- Rabbitmq dependencies --><dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>3.0.4</version>
</dependency>
Copy the code
Java examples of seven working modes
1. Simple mode
In the simplest one consumer and one producer pattern, the producer generates the message, the consumer listens for the message, and if the consumer listens for the message it needs, it will consume the message. This kind of message is secondary, which is consumed and then lost.
1.1.1, EasyRecv. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.QueueingConsumer;
public class EasyRecv {
// Queue name
private final static String QUEUE_NAME = "hello world";
public static void main(String[] argv) throws java.io.IOException,java.lang.InterruptedException {
// Open a connection and create a channel, as on the sender
ConnectionFactory factory = new ConnectionFactory();
// Set the IP address or host name of the RabbitMQ host
factory.setHost("127.0.0.1");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
// Declare a queue, mainly to prevent message receivers from running the program first and creating queues that do not already exist.
/** * queue name * Persistent * exclusive that is, only the channel is allowed to access the queue. This is used when only one consumer can consume in a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
System.out.println("Waiting for messages. To exit press CTRL+C");
// Create a queue consumer
QueueingConsumer consumer = new QueueingConsumer(channel);
// Specify the consumption queue
/** * Queue name * Other attribute routing * message body */
channel.basicConsume(QUEUE_NAME, true, consumer);
while (true)
{
NextDelivery is a blocking method (the internal implementation actually blocks the queue take method)
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("Received '" + message + "'"); }}}Copy the code
1.1.2, EasySend. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.util.Scanner;
public class EasySend {
// Queue name
private final static String QUEUE_NAME = "hello world";
public static void main(String[] argv) throws java.io.IOException
{
/** * create a connection to MabbitMQ */
ConnectionFactory factory = new ConnectionFactory();
// Set the IP address or host name of the MabbitMQ host
factory.setHost("127.0.0.1");
while (true) {// Create a connection
Connection connection = factory.newConnection();
// Create a channel
Channel channel = connection.createChannel();
// Specify a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The message sent
Scanner scanner = new Scanner(System.in);
String ms = scanner.nextLine();
//String message = "hello world!" ;
// Send a message to the queue
channel.basicPublish("", QUEUE_NAME, null, ms.getBytes());
System.out.println("Sent '" + ms + "'");
// Close channels and connectionschannel.close(); connection.close(); }}Copy the code
The above two can already communicate. Here is a simple example again, but we can see that at the code level, the connection code is the same, so we can create a connection utility class.
1.2.1, RabbitmqConnectionUtil. Java
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
public class RabbitmqConnectionUtil {
public static Connection getConnection() throws IOException {
// Connect factory
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
// Connect port 5672 notice 15672 is the tool interface port 25672 is the cluster port
factory.setPort(5672);
//factory.setVirtualHost("/xxxxx");
// factory.setUsername("xxxxxx");
// factory.setPassword("123456");
// Get the connection
Connection connection = factory.newConnection();
returnconnection; }}Copy the code
1.2.2, UtilSend. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class UtilSend {
private final static String QUEUE_NAME = "UtilConn";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// Message content
String message = "This is LBW Square.";
channel.basicPublish("", QUEUE_NAME,null,message.getBytes());
System.out.println("[x]Sent '"+message + "'");
// Finally close the pass and connectionchannel.close(); connection.close(); }}Copy the code
1.2.3, UtilRecv. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class UtilRecv {
private final static String QUEUE_NAME = "UtilConn";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = null;
connection = RabbitmqConnectionUtil.getConnection();
// Create channel
Channel channel = connection.createChannel();
// Declare a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
QueueingConsumer queueingConsumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME,true,queueingConsumer);
while(true) {// This method blocks
QueueingConsumer.Delivery delivery = queueingConsumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received '"+message+"'"); }}}Copy the code
2. Work queues
Job queue is the simple model of improved version, a queue can be more producers, also can have multiple consumers’ message to competition, but we still need to guarantee the idempotence of the queue, the queue is not able to create a queue with the same, there’s a concern the public number: kirin to bugs, you can also get the Java core knowledge and mind maps Java core study notes.
Each of the following processes controls its main thread sleep to give us a better view of the results.
2.1.1, Sender1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender1 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
for(int i = 0; i < 100; i++){
String message = "lbw" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '"+message + "'");
Thread.sleep(i*10); } channel.close(); connection.close(); }}Copy the code
2.1.2, Sender2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender2 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
for(int i = 0; i < 100; i++){
String message = "nb" + i;
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println("[x] Sent '"+message + "'");
Thread.sleep(i*10); } channel.close(); connection.close(); }}Copy the code
2.1.3, Receiver1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/** * Created by san */
public class Receiver1 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The server sends only one message to the consumer at a time
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
// About manual confirmation, I have time to study it later
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 '"+message+"'");
Thread.sleep(10);
// Return to confirm status
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}}Copy the code
2.1.4, Receiver2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/** * Created by san */
public class Receiver2 {
private final static String QUEUE_NAME = "queue_work";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The server sends only one message to the consumer at a time
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received2 '"+message+"'");
Thread.sleep(1000);
// Return to confirm status
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}}Copy the code
2.1.5, results,
The above four programs are all run, and the results can be seen as follows. According to the analysis of the results, it can be known that the same message queue can have multiple producers and consumers.
3. Publish/Subscribe (Fanout)
3.1.1, Sender. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
public class Sender {
private final static String EXCHANGE_NAME = "test_exchange_fanout";
public static void main(String[] args)
{
try
{
// Get the connection
Connection connection = RabbitmqConnectionUtil.getConnection();
// Get a channel from the connection
Channel channel = connection.createChannel();
// Declare switches (distribution: publish/subscribe mode)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// Send a message
for (int i = 0; i < 5; i++)
{
String message = "Lubenweil Square" + i;
System.out.println("[the send]." + message);
// Send a message
channel.basicPublish(EXCHANGE_NAME, "".null, message.getBytes("utf-8"));
Thread.sleep(5 * i);
}
channel.close();
connection.close();
}
catch(Exception e) { e.printStackTrace(); }}}Copy the code
3.1.2, Receiver1. Java
import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver1 {
// Switch name
private final static String EXCHANGE_NAME = "test_exchange_fanout";
// Queue name
private static final String QUEUE_NAME = "test_queue_email";
public static void main(String[] args)
{
try
{
// Get the connection
Connection connection = RabbitmqConnectionUtil.getConnection();
// Get a channel from the connection
final Channel channel = connection.createChannel();
// Declare switches (distribution: publish/subscribe mode)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// Declare a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// Bind queues to switches
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
Ensure that only one is distributed at a time
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// Define the consumer
DefaultConsumer consumer = new DefaultConsumer(channel)
{
// Execute the callback method when the message arrives
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "utf-8");
System.out.println([email] Receive message: + message);
try
{
// Consumer rest 2s deals with business
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println("[1] done");
// Reply manually
channel.basicAck(envelope.getDeliveryTag(), false); }}};// Set manual answer
boolean autoAck = false;
// Listen to the queue
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch(IOException e) { e.printStackTrace(); }}}Copy the code
3.1.3, Receiver2. Java
import com.rabbitmq.client.*;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver2 {
// Switch name
private final static String EXCHANGE_NAME = "test_exchange_fanout";
// Queue name
private static final String QUEUE_NAME = "test_queue_phone";
public static void main(String[] args)
{
try
{
// Get the connection
Connection connection = RabbitmqConnectionUtil.getConnection();
// Get a channel from the connection
final Channel channel = connection.createChannel();
// Declare switches (distribution: publish/subscribe mode)
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
// Declare a queue
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// Bind queues to switches
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");
Ensure that only one is distributed at a time
int prefetchCount = 1;
channel.basicQos(prefetchCount);
// Define the consumer
DefaultConsumer consumer = new DefaultConsumer(channel)
{
// Execute the callback method when the message arrives
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException
{
String message = new String(body, "utf-8");
System.out.println([phone] Receive message: + message);
try
{
// The consumer rests for 1s to process the business
Thread.sleep(1000);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
finally
{
System.out.println("[2] done");
// Reply manually
channel.basicAck(envelope.getDeliveryTag(), false); }}};// Set manual answer
boolean autoAck = false;
// Listen to the queue
channel.basicConsume(QUEUE_NAME, autoAck, consumer);
}
catch(IOException e) { e.printStackTrace(); }}}Copy the code
3.1.4, results,
See from application results and the RabbitMq background, belong to the news broadcast model, two different name to receive the message queue, only they will be bound to the same switch, and the message is persistent, as long as the switch is still there, what time online consumers can consume it bound by the switch, And there’s only one consumer and there’s only one consumer
4. Routing (Direct)
- In the previous example, we were already creating bindings. You might recall code like this:
Channel. queueBind (queueName, EXCHANGE_NAME, "");Copy the code
Bindings are relationships between exchanges and queues. This can be simply interpreted as: the queue is interested in the messages from this exchange.
- Bindings can take additional routingKey arguments. To avoid confusion with the basic_publish parameter, we call it a binding key. Here’s how we can create bindings with keys:
Channel. queueBind (queueName, EXCHANGE_NAME, "black");Copy the code
- Direct binding (key directly bound to a single queue)
- Multiple bindings (the same binding key binds multiple queues)
- With different keys bound to different queues, different log levels can be sent to different queues.
4.4.1, Sender
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender {
private final static String EXCHANGE_NAME = "exchange_direct";
private final static String EXCHANGE_TYPE = "direct";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME,EXCHANGE_TYPE);
String message = "That must be blue.";
channel.basicPublish(EXCHANGE_NAME,"key2".null, message.getBytes());
System.out.println("[x] Sent '"+message+"'"); channel.close(); connection.close(); }}Copy the code
4.1.2, Receiver1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/** * Created by san */
public class Receiver1 {
private final static String QUEUE_NAME = "queue_routing";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, InterruptedException {
// Get the connection and mq channel
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key");
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 "+message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}}Copy the code
4.1.3, Receiver2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
/** * Created by san */
public class Receiver2 {
private final static String QUEUE_NAME = "queue_routing2";
private final static String EXCHANGE_NAME = "exchange_direct";
public static void main(String[] args) throws IOException, InterruptedException {
// Get the connection and mq channel
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
channel.queueBind(QUEUE_NAME,EXCHANGE_NAME,"key2");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received2 "+message);
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}}Copy the code
4.1.4 Results-Summary
Note that under Direct, the Exchange must already exist in order for the consuming queue to bind to Exchange, otherwise an error will be reported. The first time we run the program, we need to start the Sender before we can successfully start the Reciver.
5. Topic
The topic is also a persistent message, and as long as the switch is in place, each online consumer can consume a topic of interest once.
- * (asterisk) can replace a word.
- # (hash) can replace zero or more words.
Focus on the public account: Kylin bug, you can also get Java core knowledge mind map and Java core learning notes.
5.1.1, Sender. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Sender {
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME, EXCHANGE_TYPE);
// Message content
String message = "This is Lubenweil Square.";
// The second argument is the topic match value
channel.basicPublish(EXCHANGE_NAME,"lbw.nb".null,message.getBytes());
System.out.println("[x] Sent '"+message+"'");
// Close the connectionchannel.close(); connection.close(); }}Copy the code
5.1.2, Receiver1. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver1 {
private final static String QUEUE_NAME = "queue_topic";
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The second parameter is to match the topic I am interested in
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.nb.*");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received1 '"+message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}}Copy the code
5.1.3, Receiver2. Java
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.QueueingConsumer;
import top.san.RabbitMq.util.RabbitmqConnectionUtil;
import java.io.IOException;
public class Receiver2 {
private final static String QUEUE_NAME = "queue_topic2";
private final static String EXCHANGE_NAME = "exchange_topic";
private final static String EXCHANGE_TYPE = "topic";
public static void main(String[] args) throws IOException, InterruptedException {
Connection connection = RabbitmqConnectionUtil.getConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, false.false.false.null);
// The second parameter is to match the topic I am interested in
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "lbw.#");
channel.basicQos(1);
QueueingConsumer consumer = new QueueingConsumer(channel);
channel.basicConsume(QUEUE_NAME, false, consumer);
while(true){
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
String message = new String(delivery.getBody());
System.out.println("[x] Received2 '"+message + "'");
Thread.sleep(10);
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }}}Copy the code
5.1.4 Results-analysis
The special feature of a topic is that queues can get messages about the topic they are interested in, and can match all strings of interest with the wildcard * or # character.
6. RPC (Remote Procedure Call)
To give you an idea of the process of implementing a call-back between two switches.
3. Switch for RabbitMq
RabbitMq has the concept of a switch. Messages are sent by the Client and forwarded to the corresponding queue by the switch. The Worker retrieves unread data processing from the queue. In this way, the sender of the message does not need to know the existence of the message user, and vice versa. Follow the public account: Kirin to change the bug, but also can obtain the Java core knowledge mind map and Java core learning notes.
Direct Exchange: directly connects to the switch and forwards messages to the queue specified by routigKey
Fanout exchange: a fan exchange that forwards messages to all bound queues (equivalent to broadcasting)
Topic exchange: Topic exchange, which forwards messages according to rules (flexible)
Headers Exchange: the first switch
BasicPublish (“”, QUEUE_NAME, null, message.getBytes()); It is the first argument to this method, which is empty to indicate that the default switch is used. Several exchange types are available: Direct, Topic, HEADERS, and FANout.
Xiaobian article to share here is over, like xiaobian share can like to share concerns oh, thank you for your support!