This is the 31st day of my participation in the August More Text Challenge
asynchronous
MQ can be used for asynchronous operations.
Take restaurant orders:
If it is synchronous mode, the restaurant accepts
After receiving the order, notify the chef, the chef receives the order request, and the restaurant completes the order request. It seems that the restaurant will not accept the next order request until the chef tells the restaurant that the request has been accepted. If a lot of orders come in, they will pile up.
Asynchronous: The restaurant receives the order and sends a message to RabbitMQ to complete the order request. The cook receives the order request via RabbitMQ and returns an ACK confirming the message.
The restaurant can continuously receive requests without waiting for the chef to confirm the response. Messages in RabbitMQ can be treated as completed tasks, just not immediately.
performance
The RabbitMQ polling mechanism will average requests, add chef servers when there are not enough chef servers, and add restaurant servers when there are not enough restaurant servers, precisely improving performance bottlenecks.
The decoupling
The two logics are separated, the restaurant logic does not need to rely on the chef logic to run, become two independent applications.
expanding
If we want to add other logic (such as invoicing), we don’t need to change the logic on top of the original code at all, just add the letters that need to be added when we send the message. The invoicing logic for orders can be added by adding additional queues and writing invoicing interfaces via RabbitMQ.
And other logic can be used in other languages.
To realize the RPC
The normal RPC implementation requires the caller and the server’s code to be closely linked, and the caller needs to wait for the server to return the result before continuing to respond.
To implement RPC with RabbitMQ, the caller simply needs to publish the message and forward it to the appropriate queue by binding. The service provider subscribes to the queue to consume the service to complete the business implementation. RabbitMQ automatically implements load balancing, service crashes, etc., without requiring manual configuration by the developer.
But how do you get the result from the server back to the caller?
AMQP is one-way. How does the consumer return to the producer?
Using a message to return a reply, the consumer also sends a reply message to the producer.
1. Each AMQP message header has a field reply_TO, which the producer sets up, pre-creates the reply queue, subscribles to it, and waits for the reply message.
2. Consumers can know the name of the reply queue by checking the Reply_TO field in the message and can publish the reply message to the reply queue using the default switch name.
3. The producer consumes the reply message from the reply queue.
implementation
producers
1. Create a connection and channel
2. Set the message properties (reply_TO field information) and send the message
3. Create and subscribe a reply queue
public void RPC() throws IOException, TimeoutException { final String RPC_SERVICE_QUEUE = "RPCService"; final String RPC_CLIENT_QUEUE = "RPCClient"; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); // Set the header property (reply field) amqp.basicProperties prop = new AMQP.BasicProperties().builder().replyTo(RPC_CLIENT_QUEUE).correlationId("22").build(); BasicPublish ("",RPC_SERVICE_QUEUE,prop,"client".getbytes ()); / / create and subscribe to the reply queue channel. QueueDeclare (RPC_CLIENT_QUEUE, false, false, false, null); com.rabbitmq.client.Consumer consumer = new DefaultConsumer(channel){ @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { System.out.println("corId"+properties.getCorrelationId()); String s = new String(body, "UTF-8"); System.out.println("message"+s); Try {// Consumer confirmation message, queue deletion message channel.basicack (envelope. GetDeliveryTag (), false); } catch (Exception e) { e.printStackTrace(); }}}; channel.basicConsume(RPC_CLIENT_QUEUE,false,consumer); // channel.close(); // connection.close(); }Copy the code
consumers
1. Create a connection and channel
2. Create and subscribe to a queue
3. The received message is processed and sent to the reply queue
public void RPC() throws IOException, TimeoutException { final String RPC_SERVICE_QUEUE = "RPCService"; final String RPC_CLIENT_QUEUE = "RPCClient"; ConnectionFactory connectionFactory = new ConnectionFactory(); connectionFactory.setHost("localhost"); Connection connection = connectionFactory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPC_SERVICE_QUEUE,false,false,false,null); DefaultConsumer consumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String replyTo = properties.getReplyTo(); AMQP.BasicProperties replyProps = new AMQP.BasicProperties().builder().correlationId("23").replyTo(replyTo).build(); System.out.println("corId"+properties.getCorrelationId()); String s = new String(body, "UTF-8"); System.out.println("message"+s); try { channel.basicPublish("",RPC_CLIENT_QUEUE,replyProps,(s+"->client").getBytes()); // Consumer confirmation message, queue deletion message channel.basicack (envelope. GetDeliveryTag (), false); // channel.basicNack(envelope.getDeliveryTag(), false,true); } catch (Exception e) { e.printStackTrace(); }}}; channel.basicConsume(RPC_SERVICE_QUEUE,true,consumer); // channel.close(); // connection.close(); }Copy the code