preface
I have been using Kafka before, but today is not Friday, I could fish again, the boss said, don’t fish this week, I want to refute, the boss said: Ok, you boy don’t push, you are just fishing, today you can tidy up mq things a little, next week’s communication meeting you host, I have to go on a business trip, you boy will give me something wrong, can you die, there is no way, can only be bitter to study, just don’t know what to write on the weekend, I will put RabbitMQ simple application, work queue mode, publish/subscribe mode, routing mode, wildcard mode with the source code listed, hoping to help you are learning
Note: Routing mode and wildcard mode can not be shown here due to space reasons, I will leave it for tomorrow to show, pay attention to me, a magical programming ape
I have not line, from work, looking for places to drink two cups, original is not easy, an afternoon not do other, sorting out, hope everybody attention + forward, thank you, give me a little support, have problem can contact me the comments section below, individual public number: Java architects alliance, maintain an original technology every day, gnome male -“
All right, knock off. Let’s get down to business
RabbitMQ (1) : Java simple operation RabbitMQ
Add POM dependencies
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
1234
Copy the code
Ii. Producers
Public class Send {// Define a message queue name private final static String QUEUE_NAME = "first-MQ "; public static void main(String[] args) throws IOException, TimeoutException {// Create a new connection ConnectionFactory Factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); An MQ server can be configured with multiple virtual machines, each of which is equivalent to an independent mq factory.setVirtualhost ("/"); factory.setUsername("user"); factory.setPassword("pwd"); Connection connection = null; Channel channel = null; try{ connection = factory.newConnection(); Channel = connection.createchannel (); channel = connection.createchannel (); // Declare the switch, there is no queue here, if there is no queue, then create // parameter: String Queue, Boolean durable, Boolean EXCLUSIVE, Boolean autoDelete, Map<String, Object> arguments /** * queue: Queue name * durable: Persistent, stored in disks * EXCLUSIVE: Exclusive, exclusive connection. Access to a queue is only allowed in the Connection. If the connection is closed, the queue will be deleted. Set to true with EXCLUSIVE to implement temporary queues. QueueDeclare (QUEUE_NAME,false,false,false, null); String message = "HELLO WORLD"; // Send a message // parameters: String Exchange, String routingKey, BasicProperties props, byte[] body /** * Exchange: if not specified (""), the default switch is * routingKey: Routing key; The switch forwards the message to the specified queue based on the routingKey. If the default switch is used, the routingKey is the queue name * props: additional property * body: The message content * / channel. BasicPublish (" ", QUEUE_NAME, null, message. GetBytes ()); System.out.println("send message:"+message); } catch (TimeoutException | IOException e) { e.printStackTrace(); } finally {// Close the channel channel.close(); Connection.close (); }}}Copy the code
Iii. Consumers
public class Recv { private final static String QUEUE_NAME = "FIRST-MQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.*5.**.8*"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("user"); factory.setPassword("pwd"); try{ Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" Waiting for messages. "); Consumer = new DefaultConsumer(channel) {** * this method will be called when a message is received * @description: * @param consumerTag: (optional) consumerTag: identifies consumer, set channel while listening to queue. BasicConsume * @param properties: The properties of the message, @override public void handleDelivery(String consumerTag, Envelope, Envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println("receive:" + message); TimeUnit.SECONDS.sleep(1); // Message id envelope. GetDeliveryTag (); // Channel. BasicAck (envelope. GetDeliveryTag (),false); }}; // Parameters: String queue, Boolean autoAck, Consumer callback /** * queue: queue * autoAck: automatic reply: when the Consumer has received the message, it tells MQ that the message has been received. TRUE: automatic reply, false: programmatic reply * callback: method to be executed when the consumer receives the message. */ channel.basicConsume(QUEUE_NAME, true, consumer); TimeUnit.SECONDS.sleep(15); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); }}}Copy the code
Work Queue
The principle of
A message is received in turn
Features:
- A producer sends a message
- Multiple consumers listen for messages on a queue
- Messages cannot be re-consumed
- Polling is used to evenly send messages to consumers (C1, C2, C1, C2… Alternate)
The introduction of pom
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
1234
Copy the code
producers
Public class Send {// Queue name private final static String QUEUE_NAME = "first-mq "; public static void main(String[] args) throws IOException, TimeoutException {// Create a new connection ConnectionFactory Factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); An MQ server can be configured with multiple virtual machines, each of which is equivalent to an independent mq factory.setVirtualhost ("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = null; Channel channel = null; try{ connection = factory.newConnection(); Channel = connection.createchannel (); channel = connection.createchannel (); // Create a queue with the default switch declaration. If there is no switch declaration, create a queue with the parameter: String Queue, Boolean durable, Boolean EXCLUSIVE, Boolean autoDelete, Map<String, Object> arguments /** * queue: Queue name * durable: Persistent, stored in disks * EXCLUSIVE: Exclusive, exclusive connection. Access to a queue is only allowed in the Connection. If the connection is closed, the queue will be deleted. Set to true with EXCLUSIVE to implement temporary queues. QueueDeclare (QUEUE_NAME,false,false,false, null); String message = "Work Queue mode, message coming." ; // Send a message // parameters: String Exchange, String routingKey, BasicProperties props, byte[] body /** * Exchange: if not specified (""), the default switch is * routingKey: Routing key; The switch forwards the message to the specified queue based on the routingKey. If the default switch is used, the routingKey is the queue name * props: additional property * body: The message content * / channel. BasicPublish (" ", QUEUE_NAME, null, message. GetBytes ()); System.out.println("send message:"+message); } catch (TimeoutException | IOException e) { e.printStackTrace(); } finally {// Close the channel channel.close(); Connection.close (); }}}Copy the code
Consumer 1
public class Recv { private final static String QUEUE_NAME = "FIRST-MQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); try{ Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages."); Consumer = new DefaultConsumer(channel) {** * this method will be called when a message is received * @description: * @param consumerTag: (optional) consumerTag: identifies consumer, set channel while listening to queue. BasicConsume * @param properties: The properties of the message, * @param Body: message content */ @sneakythrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(" consumer 1 receive:" + message); TimeUnit.SECONDS.sleep(1); // Channel. BasicAck (envelope. GetDeliveryTag (),false); }}; // Parameters: String queue, Boolean autoAck, Consumer callback /** * queue: queue * autoAck: automatic reply: when the Consumer has received the message, it tells MQ that the message has been received. TRUE: automatic reply, false: programmatic reply * callback: method to be executed when the consumer receives the message. */ channel.basicConsume(QUEUE_NAME, true, consumer); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); }}}Copy the code
Consumer 2
public class Recv2 { private final static String QUEUE_NAME = "FIRST-MQ"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); factory.setVirtualHost("/"); factory.setUsername("guest"); factory.setPassword("guest"); try{ Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); // Declare queue channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages."); Consumer = new DefaultConsumer(channel) {** * this method will be called when a message is received * @description: * @param consumerTag: (optional) consumerTag: identifies consumer, set channel while listening to queue. BasicConsume * @param properties: The properties of the message, * @param Body: message content */ @sneakythrows @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String(body, StandardCharsets.UTF_8); System.out.println(" consumer 2 receive:" + message); TimeUnit.SECONDS.sleep(1); // Channel. BasicAck (envelope. GetDeliveryTag (),false); }}; // Parameters: String queue, Boolean autoAck, Consumer callback /** * queue: queue * autoAck: automatic reply: when the Consumer has received the message, it tells MQ that the message has been received. TRUE: automatic reply, false: programmatic reply * callback: method to be executed when the consumer receives the message. */ channel.basicConsume(QUEUE_NAME, true, consumer); } catch (IOException | TimeoutException | InterruptedException e) { e.printStackTrace(); }}}Copy the code
Start two consumers, and then start producers a few more times to see the effect.
Publish/Subscribe
One message, multiple receives
Features:
- A producer sends a message to the switch
- There are multiple queues bound to the switch, and each consumer listens to its own queue
- The producer sends the message to the switch, which forwards the message to each queue bound to the switch, and each queue bound to the switch receives the message.
- If a message is sent to a switch that is not bound to a queue, it will be lost.
- It is more powerful than the work queue and can be viewed as multiple work queues.
The introduction of pom
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
</dependency>
1234
Copy the code
producers
// After registering, send SMS and email. Public class Producer {// Define two queues and one switch. Private final static String QUEUE_EMAIL = "QUEUE_EMAIL "; private final static String QUEUE_SMS = "queue_sms"; private final static String EXCHANGE_NAME = "exchange_fanout_1"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); An MQ server can be configured with multiple virtual machines, each of which is equivalent to an independent mq factory.setVirtualhost ("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = null; Channel channel = null; try{ connection = factory.newConnection(); Channel = connection.createchannel (); channel = connection.createchannel (); 1. Switch name * 2. * FANOUT: publish/subscribe mode * DIRECT: Routing mode * TOPIC: HEADERS: Corresponding HEADERS work mode * / channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType FANOUT); QueueDeclare (QUEUE_EMAIL,false,false,false, null); channel.queueDeclare(QUEUE_SMS,false,false,false,null); // Switch queue binding /** * 1 queue Queue name * 2 Exchange switch name * 3 routingKey Set this parameter to "*/" in the publish and subscribe mode channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,""); channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,""); String message = "Ok, SMS and EMAIL~~"; // Send a message // parameters: String Exchange, String routingKey, BasicProperties props, byte[] body /** * Exchange: if not specified (""), the default switch is * routingKey: Routing key; The switch forwards the message to the specified queue based on the routingKey. If the default switch is used, the routingKey is the queue name * props: additional property * body: BasicPublish (EXCHANGE_NAME,"", NULL,message.getBytes()); System.out.println("send message: "+message); } catch (TimeoutException e) { e.printStackTrace(); } catch (IOException e) { e.printStackTrace(); } finally { channel.close(); connection.close(); }}}Copy the code
Consumers receive SMS messages
/** * public class Consumer_SMS {private final static String QUEUE_EMAIL = "QUEUE_EMAIL "; private final static String QUEUE_SMS = "queue_sms"; private final static String EXCHANGE_NAME = "exchange_fanout_1"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); An MQ server can be configured with multiple virtual machines, each of which is equivalent to an independent mq factory.setVirtualhost ("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = null; Channel channel = null; try{ connection = factory.newConnection(); Channel = connection.createchannel (); channel = connection.createchannel (); FANOUT: indicates publish/subscribe mode * DIRECT: indicates the route working mode * TOPIC: indicates the route working mode * HEADERS: Corresponding HEADERS work mode * / channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType FANOUT); QueueDeclare (QUEUE_EMAIL,false,false,false, null); channel.queueDeclare(QUEUE_SMS,false,false,false,null); // Switch queue binding /** * 1 queue Queue name * 2 Exchange switch name * 3 routingKey Set this parameter to "*/ // channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,""); channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String (body, StandardCharsets.UTF_8); System.out.println("SMS receive:" + message); }}; // Parameters: String queue, Boolean autoAck, Consumer callback /** * queue: queue * autoAck: automatic reply: when the Consumer has received the message, it tells MQ that the message has been received. TRUE: automatic reply, false: programmatic reply * callback: method to be executed when the consumer receives the message. */ channel.basicConsume(QUEUE_SMS,true,consumer); } catch (TimeoutException | IOException e) { e.printStackTrace(); }}}Copy the code
Consumers receive mail
public class Consumer_EMAIL { private final static String QUEUE_EMAIL = "queue_email"; private final static String QUEUE_SMS = "queue_sms"; private final static String EXCHANGE_NAME = "exchange_fanout_1"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); An MQ server can be configured with multiple virtual machines, each of which is equivalent to an independent mq factory.setVirtualhost ("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = null; Channel channel = null; try{ connection = factory.newConnection(); Channel = connection.createchannel (); channel = connection.createchannel (); FANOUT: indicates publish/subscribe mode * DIRECT: indicates the route working mode * TOPIC: indicates the route working mode * HEADERS: Corresponding HEADERS work mode * / channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType FANOUT); // Declare a queue, if not, create channel.queueDeclare(QUEUE_EMAIL,false,false,false, null); // Switch queue binding /** * 1 queue Queue name * 2 Exchange switch name * 3 routingKey Set this parameter to "*/" in the publish and subscribe mode channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String (body, StandardCharsets.UTF_8); System.out.println("Email receive:" + message); }}; // Parameters: String queue, Boolean autoAck, Consumer callback /** * queue: queue * autoAck: automatic reply: when the Consumer has received the message, it tells MQ that the message has been received. TRUE: automatic reply, false: programmatic reply * callback: method to be executed when the consumer receives the message. */ channel.basicConsume(QUEUE_EMAIL,true,consumer); } catch (TimeoutException | IOException e) { e.printStackTrace(); }}}Copy the code
Added: Combined with the work queue pattern
In fact, as long as the consumer [C1] side by side add a [C3], C1 and C3 share the same queue
Public class Consumer_SMS_2 {private final static String QUEUE_EMAIL = "QUEUE_EMAIL "; private final static String QUEUE_SMS = "queue_sms"; private final static String EXCHANGE_NAME = "exchange_fanout_1"; public static void main(String[] args) { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("106.**.**.82"); factory.setPort(5672); An MQ server can be configured with multiple virtual machines, each of which is equivalent to an independent mq factory.setVirtualhost ("/"); factory.setUsername("guest"); factory.setPassword("guest"); Connection connection = null; Channel channel = null; try{ connection = factory.newConnection(); Channel = connection.createchannel (); channel = connection.createchannel (); FANOUT: indicates publish/subscribe mode * DIRECT: indicates the route working mode * TOPIC: indicates the route working mode * HEADERS: Corresponding HEADERS work mode * / channel. ExchangeDeclare (EXCHANGE_NAME, BuiltinExchangeType FANOUT); QueueDeclare (QUEUE_EMAIL,false,false,false, null); channel.queueDeclare(QUEUE_SMS,false,false,false,null); // Switch queue binding /** * 1 queue Queue name * 2 Exchange switch name * 3 routingKey Set this parameter to "*/ // channel.queueBind(QUEUE_EMAIL,EXCHANGE_NAME,""); channel.queueBind(QUEUE_SMS,EXCHANGE_NAME,""); DefaultConsumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String message = new String (body, StandardCharsets.UTF_8); System.out.println("SMS_2 receive:" + message); }}; // Parameters: String queue, Boolean autoAck, Consumer callback /** * queue: queue * autoAck: automatic reply: when the Consumer has received the message, it tells MQ that the message has been received. TRUE: automatic reply, false: programmatic reply * callback: method to be executed when the consumer receives the message. */ channel.basicConsume(QUEUE_SMS,true,consumer); } catch (TimeoutException | IOException e) { e.printStackTrace(); }}}Copy the code