This is the fifth day of my participation in Gwen Challenge

Work message model

Distribution of tasks among workers (competitor model)

In the last post, we were a producer and a consumer. In this post, we created work to distribute tasks among multiple consumers.

Work queues, also known as task queues, avoid performing resource-intensive tasks that have to wait until the end of execution. We send messages to queues, and consumers get messages and consume them. Multiple consumers execute in parallel, but a message can only be consumed once.

P: producers are responsible for producing messages c1: consumers1 
c2Consumers:2
Copy the code

C1 processes messages a little faster than C2

producers

public class Producer_HelloWorld {
    public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        / / 4. To create a channel
        Channel channel = connection.createChannel();
        / / 5. Create a queue
        channel.queueDeclare("hello_world".true.false.false.null);
        / / 6. Send
        for (int i = 0; i < 50; i++) {
            String bady ="The first" hello RabbitMQ+i+"Time";
            channel.basicPublish(""."hello_world".null,bady.getBytes());
            System.out.println("Producer sends message"+bady);
        }
        //7. Release resourceschannel.close(); connection.close(); }}Copy the code

Consumer 1

public class c1 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        / / 4. To create a channel
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello_world".true.false.false.null);
        //6. Receive messages
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C1 consumption News:"+new String(body));
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }}; channel.basicConsume("hello_world".false,consumer); }}Copy the code

Consumer 2

public class c2 {
    public static void main(String[] args) throws IOException, TimeoutException {
        //1. Create connection factories
        ConnectionFactory connectionFactory = new ConnectionFactory();
        //2. Set parameters
        connectionFactory.setHost("192.168.145.3");
        connectionFactory.setPort(5672);
        connectionFactory.setVirtualHost("/zhaojin");
        connectionFactory.setUsername("zhaojin");
        connectionFactory.setPassword("zhaojin");
        3. Create a connection
        Connection connection = connectionFactory.newConnection();
        / / 4. To create a channel
        Channel channel = connection.createChannel();
        channel.queueDeclare("hello_world".true.false.false.null);
        //6. Receive messages
        Consumer consumer = new DefaultConsumer(channel){
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                System.out.println("C2 consumer News :"+new String(body));
                try {
                    Thread.sleep(100);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                / / ack manually
                channel.basicAck(envelope.getDeliveryTag(),false); }}; channel.basicConsume("hello_world".false,consumer); }}Copy the code

Set consumer 2 to pause 1m to indicate consumption speed, then open c1C2 at the same time, then open producer to send 50 messages. Let’s see what the console is doing.

Although they both consumed 25 messages to achieve the task distribution, consumer 1 processed the task quickly and remained idle until the end of the processing. However, the two people processed the same number of messages, which is not good, because the person who processed the message quickly is required to process the message more. To do this, we can set MQ to tell MQ not to send more than one message to a consumer at a time, not to send a message to the person who is working again until ACK confirms it, and instead to send the message to the person who is working faster.

 // Set processing to one at a time
        channel.basicQos(1);
Copy the code

Running again, you can see that the enabler handles most of the work.